diff options
Diffstat (limited to 'internal/storage/events.go')
| -rw-r--r-- | internal/storage/events.go | 216 |
1 files changed, 216 insertions, 0 deletions
diff --git a/internal/storage/events.go b/internal/storage/events.go new file mode 100644 index 0000000..d74fc7e --- /dev/null +++ b/internal/storage/events.go | |||
| @@ -0,0 +1,216 @@ | |||
| 1 | package storage | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "database/sql" | ||
| 6 | "encoding/json" | ||
| 7 | "errors" | ||
| 8 | "fmt" | ||
| 9 | |||
| 10 | "github.com/klauspost/compress/zstd" | ||
| 11 | "google.golang.org/protobuf/proto" | ||
| 12 | |||
| 13 | pb "northwest.io/nostr-grpc/api/nostr/v1" | ||
| 14 | ) | ||
| 15 | |||
| 16 | var ( | ||
| 17 | // ErrEventNotFound is returned when an event ID is not found in storage. | ||
| 18 | ErrEventNotFound = errors.New("event not found") | ||
| 19 | |||
| 20 | // ErrEventExists is returned when attempting to store a duplicate event. | ||
| 21 | ErrEventExists = errors.New("event already exists") | ||
| 22 | ) | ||
| 23 | |||
| 24 | // EventData holds the data needed to store an event. | ||
| 25 | type EventData struct { | ||
| 26 | Event *pb.Event // Protobuf event | ||
| 27 | CanonicalJSON []byte // Uncompressed canonical JSON (will be compressed on storage) | ||
| 28 | } | ||
| 29 | |||
| 30 | // StoreEvent stores an event in the database. | ||
| 31 | // Returns ErrEventExists if the event ID already exists. | ||
| 32 | func (s *Storage) StoreEvent(ctx context.Context, data *EventData) error { | ||
| 33 | // Serialize protobuf | ||
| 34 | eventBytes, err := proto.Marshal(data.Event) | ||
| 35 | if err != nil { | ||
| 36 | return fmt.Errorf("failed to marshal event: %w", err) | ||
| 37 | } | ||
| 38 | |||
| 39 | // Compress canonical JSON | ||
| 40 | compressedJSON, err := compressJSON(data.CanonicalJSON) | ||
| 41 | if err != nil { | ||
| 42 | return fmt.Errorf("failed to compress canonical JSON: %w", err) | ||
| 43 | } | ||
| 44 | |||
| 45 | // Serialize tags to JSON | ||
| 46 | tagsJSON, err := marshalTags(data.Event.Tags) | ||
| 47 | if err != nil { | ||
| 48 | return fmt.Errorf("failed to marshal tags: %w", err) | ||
| 49 | } | ||
| 50 | |||
| 51 | // Insert event | ||
| 52 | query := ` | ||
| 53 | INSERT INTO events (id, event_data, canonical_json, pubkey, kind, created_at, content, tags, sig) | ||
| 54 | VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | ||
| 55 | ` | ||
| 56 | |||
| 57 | _, err = s.db.ExecContext(ctx, query, | ||
| 58 | data.Event.Id, | ||
| 59 | eventBytes, | ||
| 60 | compressedJSON, | ||
| 61 | data.Event.Pubkey, | ||
| 62 | data.Event.Kind, | ||
| 63 | data.Event.CreatedAt, | ||
| 64 | data.Event.Content, | ||
| 65 | tagsJSON, | ||
| 66 | data.Event.Sig, | ||
| 67 | ) | ||
| 68 | |||
| 69 | if err != nil { | ||
| 70 | // Check for unique constraint violation (duplicate ID) | ||
| 71 | if isDuplicateError(err) { | ||
| 72 | return ErrEventExists | ||
| 73 | } | ||
| 74 | return fmt.Errorf("failed to insert event: %w", err) | ||
| 75 | } | ||
| 76 | |||
| 77 | return nil | ||
| 78 | } | ||
| 79 | |||
| 80 | // GetEvent retrieves an event by ID. | ||
| 81 | // Returns ErrEventNotFound if the event doesn't exist. | ||
| 82 | func (s *Storage) GetEvent(ctx context.Context, id string) (*pb.Event, error) { | ||
| 83 | query := ` | ||
| 84 | SELECT event_data, canonical_json | ||
| 85 | FROM events | ||
| 86 | WHERE id = ? AND deleted = 0 | ||
| 87 | ` | ||
| 88 | |||
| 89 | var eventBytes, compressedJSON []byte | ||
| 90 | err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON) | ||
| 91 | |||
| 92 | if err == sql.ErrNoRows { | ||
| 93 | return nil, ErrEventNotFound | ||
| 94 | } | ||
| 95 | if err != nil { | ||
| 96 | return nil, fmt.Errorf("failed to query event: %w", err) | ||
| 97 | } | ||
| 98 | |||
| 99 | // Deserialize protobuf | ||
| 100 | event := &pb.Event{} | ||
| 101 | if err := proto.Unmarshal(eventBytes, event); err != nil { | ||
| 102 | return nil, fmt.Errorf("failed to unmarshal event: %w", err) | ||
| 103 | } | ||
| 104 | |||
| 105 | return event, nil | ||
| 106 | } | ||
| 107 | |||
| 108 | // GetEventWithCanonical retrieves an event by ID with its canonical JSON. | ||
| 109 | // The canonical_json field will be populated in the returned event. | ||
| 110 | func (s *Storage) GetEventWithCanonical(ctx context.Context, id string) (*pb.Event, error) { | ||
| 111 | query := ` | ||
| 112 | SELECT event_data, canonical_json | ||
| 113 | FROM events | ||
| 114 | WHERE id = ? AND deleted = 0 | ||
| 115 | ` | ||
| 116 | |||
| 117 | var eventBytes, compressedJSON []byte | ||
| 118 | err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON) | ||
| 119 | |||
| 120 | if err == sql.ErrNoRows { | ||
| 121 | return nil, ErrEventNotFound | ||
| 122 | } | ||
| 123 | if err != nil { | ||
| 124 | return nil, fmt.Errorf("failed to query event: %w", err) | ||
| 125 | } | ||
| 126 | |||
| 127 | // Deserialize protobuf | ||
| 128 | event := &pb.Event{} | ||
| 129 | if err := proto.Unmarshal(eventBytes, event); err != nil { | ||
| 130 | return nil, fmt.Errorf("failed to unmarshal event: %w", err) | ||
| 131 | } | ||
| 132 | |||
| 133 | // Decompress canonical JSON | ||
| 134 | canonicalJSON, err := decompressJSON(compressedJSON) | ||
| 135 | if err != nil { | ||
| 136 | return nil, fmt.Errorf("failed to decompress canonical JSON: %w", err) | ||
| 137 | } | ||
| 138 | |||
| 139 | event.CanonicalJson = canonicalJSON | ||
| 140 | |||
| 141 | return event, nil | ||
| 142 | } | ||
| 143 | |||
| 144 | // DeleteEvent marks an event as deleted (soft delete). | ||
| 145 | func (s *Storage) DeleteEvent(ctx context.Context, eventID string) error { | ||
| 146 | query := `UPDATE events SET deleted = 1 WHERE id = ?` | ||
| 147 | result, err := s.db.ExecContext(ctx, query, eventID) | ||
| 148 | if err != nil { | ||
| 149 | return fmt.Errorf("failed to delete event: %w", err) | ||
| 150 | } | ||
| 151 | |||
| 152 | rows, err := result.RowsAffected() | ||
| 153 | if err != nil { | ||
| 154 | return fmt.Errorf("failed to get rows affected: %w", err) | ||
| 155 | } | ||
| 156 | |||
| 157 | if rows == 0 { | ||
| 158 | return ErrEventNotFound | ||
| 159 | } | ||
| 160 | |||
| 161 | return nil | ||
| 162 | } | ||
| 163 | |||
| 164 | // compressJSON compresses JSON bytes using zstd. | ||
| 165 | func compressJSON(data []byte) ([]byte, error) { | ||
| 166 | encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) | ||
| 167 | if err != nil { | ||
| 168 | return nil, err | ||
| 169 | } | ||
| 170 | defer encoder.Close() | ||
| 171 | |||
| 172 | return encoder.EncodeAll(data, nil), nil | ||
| 173 | } | ||
| 174 | |||
| 175 | // decompressJSON decompresses zstd-compressed JSON bytes. | ||
| 176 | func decompressJSON(data []byte) ([]byte, error) { | ||
| 177 | decoder, err := zstd.NewReader(nil) | ||
| 178 | if err != nil { | ||
| 179 | return nil, err | ||
| 180 | } | ||
| 181 | defer decoder.Close() | ||
| 182 | |||
| 183 | return decoder.DecodeAll(data, nil) | ||
| 184 | } | ||
| 185 | |||
| 186 | // marshalTags converts protobuf tags to JSON string. | ||
| 187 | func marshalTags(tags []*pb.Tag) (string, error) { | ||
| 188 | if len(tags) == 0 { | ||
| 189 | return "[]", nil | ||
| 190 | } | ||
| 191 | |||
| 192 | // Convert to [][]string for JSON serialization | ||
| 193 | tagArrays := make([][]string, len(tags)) | ||
| 194 | for i, tag := range tags { | ||
| 195 | tagArrays[i] = tag.Values | ||
| 196 | } | ||
| 197 | |||
| 198 | data, err := json.Marshal(tagArrays) | ||
| 199 | if err != nil { | ||
| 200 | return "", err | ||
| 201 | } | ||
| 202 | |||
| 203 | return string(data), nil | ||
| 204 | } | ||
| 205 | |||
| 206 | // isDuplicateError checks if the error is a unique constraint violation. | ||
| 207 | func isDuplicateError(err error) bool { | ||
| 208 | if err == nil { | ||
| 209 | return false | ||
| 210 | } | ||
| 211 | // SQLite error messages for UNIQUE constraint violations | ||
| 212 | msg := err.Error() | ||
| 213 | return msg == "UNIQUE constraint failed: events.id" || | ||
| 214 | msg == "constraint failed: UNIQUE constraint failed: events.id" || | ||
| 215 | msg == "constraint failed: UNIQUE constraint failed: events.id (1555)" | ||
| 216 | } | ||
