From a6502c0888613bd0377a25e43de8ae306c4de4d7 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:41:13 -0800 Subject: feat: add SQLite storage layer with binary-first event persistence Storage implementation: - Concrete type with constructor (consumer-side interfaces) - Event storage: protobuf + zstd-compressed canonical JSON - Schema: events, deletions, replaceable_events, auth_challenges, rate_limits - WAL mode, STRICT typing, optimized indexes - Methods: StoreEvent, GetEvent, GetEventWithCanonical, DeleteEvent Dependencies: - modernc.org/sqlite v1.45.0 (pure Go SQLite driver) - github.com/klauspost/compress v1.18.4 (zstd compression) 366 lines, 10 tests passing --- internal/storage/events.go | 216 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 internal/storage/events.go (limited to 'internal/storage/events.go') diff --git a/internal/storage/events.go b/internal/storage/events.go new file mode 100644 index 0000000..d74fc7e --- /dev/null +++ b/internal/storage/events.go @@ -0,0 +1,216 @@ +package storage + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + + "github.com/klauspost/compress/zstd" + "google.golang.org/protobuf/proto" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +var ( + // ErrEventNotFound is returned when an event ID is not found in storage. + ErrEventNotFound = errors.New("event not found") + + // ErrEventExists is returned when attempting to store a duplicate event. + ErrEventExists = errors.New("event already exists") +) + +// EventData holds the data needed to store an event. +type EventData struct { + Event *pb.Event // Protobuf event + CanonicalJSON []byte // Uncompressed canonical JSON (will be compressed on storage) +} + +// StoreEvent stores an event in the database. +// Returns ErrEventExists if the event ID already exists. +func (s *Storage) StoreEvent(ctx context.Context, data *EventData) error { + // Serialize protobuf + eventBytes, err := proto.Marshal(data.Event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + // Compress canonical JSON + compressedJSON, err := compressJSON(data.CanonicalJSON) + if err != nil { + return fmt.Errorf("failed to compress canonical JSON: %w", err) + } + + // Serialize tags to JSON + tagsJSON, err := marshalTags(data.Event.Tags) + if err != nil { + return fmt.Errorf("failed to marshal tags: %w", err) + } + + // Insert event + query := ` + INSERT INTO events (id, event_data, canonical_json, pubkey, kind, created_at, content, tags, sig) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ` + + _, err = s.db.ExecContext(ctx, query, + data.Event.Id, + eventBytes, + compressedJSON, + data.Event.Pubkey, + data.Event.Kind, + data.Event.CreatedAt, + data.Event.Content, + tagsJSON, + data.Event.Sig, + ) + + if err != nil { + // Check for unique constraint violation (duplicate ID) + if isDuplicateError(err) { + return ErrEventExists + } + return fmt.Errorf("failed to insert event: %w", err) + } + + return nil +} + +// GetEvent retrieves an event by ID. +// Returns ErrEventNotFound if the event doesn't exist. +func (s *Storage) GetEvent(ctx context.Context, id string) (*pb.Event, error) { + query := ` + SELECT event_data, canonical_json + FROM events + WHERE id = ? AND deleted = 0 + ` + + var eventBytes, compressedJSON []byte + err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON) + + if err == sql.ErrNoRows { + return nil, ErrEventNotFound + } + if err != nil { + return nil, fmt.Errorf("failed to query event: %w", err) + } + + // Deserialize protobuf + event := &pb.Event{} + if err := proto.Unmarshal(eventBytes, event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + + return event, nil +} + +// GetEventWithCanonical retrieves an event by ID with its canonical JSON. +// The canonical_json field will be populated in the returned event. +func (s *Storage) GetEventWithCanonical(ctx context.Context, id string) (*pb.Event, error) { + query := ` + SELECT event_data, canonical_json + FROM events + WHERE id = ? AND deleted = 0 + ` + + var eventBytes, compressedJSON []byte + err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON) + + if err == sql.ErrNoRows { + return nil, ErrEventNotFound + } + if err != nil { + return nil, fmt.Errorf("failed to query event: %w", err) + } + + // Deserialize protobuf + event := &pb.Event{} + if err := proto.Unmarshal(eventBytes, event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + + // Decompress canonical JSON + canonicalJSON, err := decompressJSON(compressedJSON) + if err != nil { + return nil, fmt.Errorf("failed to decompress canonical JSON: %w", err) + } + + event.CanonicalJson = canonicalJSON + + return event, nil +} + +// DeleteEvent marks an event as deleted (soft delete). +func (s *Storage) DeleteEvent(ctx context.Context, eventID string) error { + query := `UPDATE events SET deleted = 1 WHERE id = ?` + result, err := s.db.ExecContext(ctx, query, eventID) + if err != nil { + return fmt.Errorf("failed to delete event: %w", err) + } + + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rows == 0 { + return ErrEventNotFound + } + + return nil +} + +// compressJSON compresses JSON bytes using zstd. +func compressJSON(data []byte) ([]byte, error) { + encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + return nil, err + } + defer encoder.Close() + + return encoder.EncodeAll(data, nil), nil +} + +// decompressJSON decompresses zstd-compressed JSON bytes. +func decompressJSON(data []byte) ([]byte, error) { + decoder, err := zstd.NewReader(nil) + if err != nil { + return nil, err + } + defer decoder.Close() + + return decoder.DecodeAll(data, nil) +} + +// marshalTags converts protobuf tags to JSON string. +func marshalTags(tags []*pb.Tag) (string, error) { + if len(tags) == 0 { + return "[]", nil + } + + // Convert to [][]string for JSON serialization + tagArrays := make([][]string, len(tags)) + for i, tag := range tags { + tagArrays[i] = tag.Values + } + + data, err := json.Marshal(tagArrays) + if err != nil { + return "", err + } + + return string(data), nil +} + +// isDuplicateError checks if the error is a unique constraint violation. +func isDuplicateError(err error) bool { + if err == nil { + return false + } + // SQLite error messages for UNIQUE constraint violations + msg := err.Error() + return msg == "UNIQUE constraint failed: events.id" || + msg == "constraint failed: UNIQUE constraint failed: events.id" || + msg == "constraint failed: UNIQUE constraint failed: events.id (1555)" +} -- cgit v1.2.3