summaryrefslogtreecommitdiffstats
path: root/internal/storage/events.go
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-02-13 17:41:13 -0800
committerbndw <ben@bdw.to>2026-02-13 17:41:13 -0800
commita6502c0888613bd0377a25e43de8ae306c4de4d7 (patch)
tree7e6b9eaaafbd97d3d0ef5007e392fa7b91e35f6c /internal/storage/events.go
parentaf30945803d440d1f803c814f4a37a1890494f1d (diff)
feat: add SQLite storage layer with binary-first event persistence
Storage implementation: - Concrete type with constructor (consumer-side interfaces) - Event storage: protobuf + zstd-compressed canonical JSON - Schema: events, deletions, replaceable_events, auth_challenges, rate_limits - WAL mode, STRICT typing, optimized indexes - Methods: StoreEvent, GetEvent, GetEventWithCanonical, DeleteEvent Dependencies: - modernc.org/sqlite v1.45.0 (pure Go SQLite driver) - github.com/klauspost/compress v1.18.4 (zstd compression) 366 lines, 10 tests passing
Diffstat (limited to 'internal/storage/events.go')
-rw-r--r--internal/storage/events.go216
1 files changed, 216 insertions, 0 deletions
diff --git a/internal/storage/events.go b/internal/storage/events.go
new file mode 100644
index 0000000..d74fc7e
--- /dev/null
+++ b/internal/storage/events.go
@@ -0,0 +1,216 @@
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "errors"
8 "fmt"
9
10 "github.com/klauspost/compress/zstd"
11 "google.golang.org/protobuf/proto"
12
13 pb "northwest.io/nostr-grpc/api/nostr/v1"
14)
15
16var (
17 // ErrEventNotFound is returned when an event ID is not found in storage.
18 ErrEventNotFound = errors.New("event not found")
19
20 // ErrEventExists is returned when attempting to store a duplicate event.
21 ErrEventExists = errors.New("event already exists")
22)
23
24// EventData holds the data needed to store an event.
25type EventData struct {
26 Event *pb.Event // Protobuf event
27 CanonicalJSON []byte // Uncompressed canonical JSON (will be compressed on storage)
28}
29
30// StoreEvent stores an event in the database.
31// Returns ErrEventExists if the event ID already exists.
32func (s *Storage) StoreEvent(ctx context.Context, data *EventData) error {
33 // Serialize protobuf
34 eventBytes, err := proto.Marshal(data.Event)
35 if err != nil {
36 return fmt.Errorf("failed to marshal event: %w", err)
37 }
38
39 // Compress canonical JSON
40 compressedJSON, err := compressJSON(data.CanonicalJSON)
41 if err != nil {
42 return fmt.Errorf("failed to compress canonical JSON: %w", err)
43 }
44
45 // Serialize tags to JSON
46 tagsJSON, err := marshalTags(data.Event.Tags)
47 if err != nil {
48 return fmt.Errorf("failed to marshal tags: %w", err)
49 }
50
51 // Insert event
52 query := `
53 INSERT INTO events (id, event_data, canonical_json, pubkey, kind, created_at, content, tags, sig)
54 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
55 `
56
57 _, err = s.db.ExecContext(ctx, query,
58 data.Event.Id,
59 eventBytes,
60 compressedJSON,
61 data.Event.Pubkey,
62 data.Event.Kind,
63 data.Event.CreatedAt,
64 data.Event.Content,
65 tagsJSON,
66 data.Event.Sig,
67 )
68
69 if err != nil {
70 // Check for unique constraint violation (duplicate ID)
71 if isDuplicateError(err) {
72 return ErrEventExists
73 }
74 return fmt.Errorf("failed to insert event: %w", err)
75 }
76
77 return nil
78}
79
80// GetEvent retrieves an event by ID.
81// Returns ErrEventNotFound if the event doesn't exist.
82func (s *Storage) GetEvent(ctx context.Context, id string) (*pb.Event, error) {
83 query := `
84 SELECT event_data, canonical_json
85 FROM events
86 WHERE id = ? AND deleted = 0
87 `
88
89 var eventBytes, compressedJSON []byte
90 err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON)
91
92 if err == sql.ErrNoRows {
93 return nil, ErrEventNotFound
94 }
95 if err != nil {
96 return nil, fmt.Errorf("failed to query event: %w", err)
97 }
98
99 // Deserialize protobuf
100 event := &pb.Event{}
101 if err := proto.Unmarshal(eventBytes, event); err != nil {
102 return nil, fmt.Errorf("failed to unmarshal event: %w", err)
103 }
104
105 return event, nil
106}
107
108// GetEventWithCanonical retrieves an event by ID with its canonical JSON.
109// The canonical_json field will be populated in the returned event.
110func (s *Storage) GetEventWithCanonical(ctx context.Context, id string) (*pb.Event, error) {
111 query := `
112 SELECT event_data, canonical_json
113 FROM events
114 WHERE id = ? AND deleted = 0
115 `
116
117 var eventBytes, compressedJSON []byte
118 err := s.db.QueryRowContext(ctx, query, id).Scan(&eventBytes, &compressedJSON)
119
120 if err == sql.ErrNoRows {
121 return nil, ErrEventNotFound
122 }
123 if err != nil {
124 return nil, fmt.Errorf("failed to query event: %w", err)
125 }
126
127 // Deserialize protobuf
128 event := &pb.Event{}
129 if err := proto.Unmarshal(eventBytes, event); err != nil {
130 return nil, fmt.Errorf("failed to unmarshal event: %w", err)
131 }
132
133 // Decompress canonical JSON
134 canonicalJSON, err := decompressJSON(compressedJSON)
135 if err != nil {
136 return nil, fmt.Errorf("failed to decompress canonical JSON: %w", err)
137 }
138
139 event.CanonicalJson = canonicalJSON
140
141 return event, nil
142}
143
144// DeleteEvent marks an event as deleted (soft delete).
145func (s *Storage) DeleteEvent(ctx context.Context, eventID string) error {
146 query := `UPDATE events SET deleted = 1 WHERE id = ?`
147 result, err := s.db.ExecContext(ctx, query, eventID)
148 if err != nil {
149 return fmt.Errorf("failed to delete event: %w", err)
150 }
151
152 rows, err := result.RowsAffected()
153 if err != nil {
154 return fmt.Errorf("failed to get rows affected: %w", err)
155 }
156
157 if rows == 0 {
158 return ErrEventNotFound
159 }
160
161 return nil
162}
163
164// compressJSON compresses JSON bytes using zstd.
165func compressJSON(data []byte) ([]byte, error) {
166 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
167 if err != nil {
168 return nil, err
169 }
170 defer encoder.Close()
171
172 return encoder.EncodeAll(data, nil), nil
173}
174
175// decompressJSON decompresses zstd-compressed JSON bytes.
176func decompressJSON(data []byte) ([]byte, error) {
177 decoder, err := zstd.NewReader(nil)
178 if err != nil {
179 return nil, err
180 }
181 defer decoder.Close()
182
183 return decoder.DecodeAll(data, nil)
184}
185
186// marshalTags converts protobuf tags to JSON string.
187func marshalTags(tags []*pb.Tag) (string, error) {
188 if len(tags) == 0 {
189 return "[]", nil
190 }
191
192 // Convert to [][]string for JSON serialization
193 tagArrays := make([][]string, len(tags))
194 for i, tag := range tags {
195 tagArrays[i] = tag.Values
196 }
197
198 data, err := json.Marshal(tagArrays)
199 if err != nil {
200 return "", err
201 }
202
203 return string(data), nil
204}
205
206// isDuplicateError checks if the error is a unique constraint violation.
207func isDuplicateError(err error) bool {
208 if err == nil {
209 return false
210 }
211 // SQLite error messages for UNIQUE constraint violations
212 msg := err.Error()
213 return msg == "UNIQUE constraint failed: events.id" ||
214 msg == "constraint failed: UNIQUE constraint failed: events.id" ||
215 msg == "constraint failed: UNIQUE constraint failed: events.id (1555)"
216}