package storage import ( "context" "database/sql" "encoding/json" "errors" "fmt" "github.com/klauspost/compress/zstd" "google.golang.org/protobuf/proto" pb "northwest.io/muxstr/api/nostr/v1" ) var ( // ErrEventNotFound is returned when an event ID is not found in storage. ErrEventNotFound = errors.New("event not found") // ErrEventExists is returned when attempting to store a duplicate event. ErrEventExists = errors.New("event already exists") ) // EventData holds the data needed to store an event. type EventData struct { Event *pb.Event // Protobuf event CanonicalJSON []byte // Uncompressed canonical JSON (will be compressed on storage) } // StoreEvent stores an event in the database. // Returns ErrEventExists if the event ID already exists. func (s *Storage) StoreEvent(ctx context.Context, data *EventData) error { // Serialize protobuf eventBytes, err := proto.Marshal(data.Event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } // Compress canonical JSON compressedJSON, err := compressJSON(data.CanonicalJSON) if err != nil { return fmt.Errorf("failed to compress canonical JSON: %w", err) } // Serialize tags to JSON tagsJSON, err := marshalTags(data.Event.Tags) if err != nil { return fmt.Errorf("failed to marshal tags: %w", err) } // Insert event query := ` INSERT INTO events (id, event_data, canonical_json, pubkey, kind, created_at, content, tags, sig) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ` _, err = s.db.ExecContext(ctx, query, data.Event.Id, eventBytes, compressedJSON, data.Event.Pubkey, data.Event.Kind, data.Event.CreatedAt, data.Event.Content, tagsJSON, data.Event.Sig, ) if err != nil { // Check for unique constraint violation (duplicate ID) if isDuplicateError(err) { return ErrEventExists } return fmt.Errorf("failed to insert event: %w", err) } return nil } // GetEvent retrieves an event by ID. // Returns ErrEventNotFound if the event doesn't exist. func (s *Storage) GetEvent(ctx context.Context, id string) (*pb.Event, error) { query := ` SELECT event_data, canonical_json FROM events WHERE id = ? AND deleted = 0 ` var eventBytes, compressedJSON []byte err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON) if err == sql.ErrNoRows { return nil, ErrEventNotFound } if err != nil { return nil, fmt.Errorf("failed to query event: %w", err) } // Deserialize protobuf event := &pb.Event{} if err := proto.Unmarshal(eventBytes, event); err != nil { return nil, fmt.Errorf("failed to unmarshal event: %w", err) } return event, nil } // GetEventWithCanonical retrieves an event by ID with its canonical JSON. // The canonical_json field will be populated in the returned event. func (s *Storage) GetEventWithCanonical(ctx context.Context, id string) (*pb.Event, error) { query := ` SELECT event_data, canonical_json FROM events WHERE id = ? AND deleted = 0 ` var eventBytes, compressedJSON []byte err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON) if err == sql.ErrNoRows { return nil, ErrEventNotFound } if err != nil { return nil, fmt.Errorf("failed to query event: %w", err) } // Deserialize protobuf event := &pb.Event{} if err := proto.Unmarshal(eventBytes, event); err != nil { return nil, fmt.Errorf("failed to unmarshal event: %w", err) } // Decompress canonical JSON canonicalJSON, err := decompressJSON(compressedJSON) if err != nil { return nil, fmt.Errorf("failed to decompress canonical JSON: %w", err) } event.CanonicalJson = canonicalJSON return event, nil } // DeleteEvent marks an event as deleted (soft delete). func (s *Storage) DeleteEvent(ctx context.Context, eventID string) error { query := `UPDATE events SET deleted = 1 WHERE id = ?` result, err := s.db.ExecContext(ctx, query, eventID) if err != nil { return fmt.Errorf("failed to delete event: %w", err) } rows, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get rows affected: %w", err) } if rows == 0 { return ErrEventNotFound } return nil } // compressJSON compresses JSON bytes using zstd. func compressJSON(data []byte) ([]byte, error) { encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) if err != nil { return nil, err } defer encoder.Close() return encoder.EncodeAll(data, nil), nil } // decompressJSON decompresses zstd-compressed JSON bytes. func decompressJSON(data []byte) ([]byte, error) { decoder, err := zstd.NewReader(nil) if err != nil { return nil, err } defer decoder.Close() return decoder.DecodeAll(data, nil) } // marshalTags converts protobuf tags to JSON string. func marshalTags(tags []*pb.Tag) (string, error) { if len(tags) == 0 { return "[]", nil } // Convert to [][]string for JSON serialization tagArrays := make([][]string, len(tags)) for i, tag := range tags { tagArrays[i] = tag.Values } data, err := json.Marshal(tagArrays) if err != nil { return "", err } return string(data), nil } // isDuplicateError checks if the error is a unique constraint violation. func isDuplicateError(err error) bool { if err == nil { return false } // SQLite error messages for UNIQUE constraint violations msg := err.Error() return msg == "UNIQUE constraint failed: events.id" || msg == "constraint failed: UNIQUE constraint failed: events.id" || msg == "constraint failed: UNIQUE constraint failed: events.id (1555)" }