summaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/storage/query.go185
-rw-r--r--internal/storage/query_test.go437
2 files changed, 622 insertions, 0 deletions
diff --git a/internal/storage/query.go b/internal/storage/query.go
new file mode 100644
index 0000000..29a2a4c
--- /dev/null
+++ b/internal/storage/query.go
@@ -0,0 +1,185 @@
1package storage
2
3import (
4 "context"
5 "fmt"
6 "strings"
7
8 "google.golang.org/protobuf/proto"
9
10 pb "northwest.io/nostr-grpc/api/nostr/v1"
11)
12
13type QueryOptions struct {
14 IncludeCanonical bool
15 Limit int32
16}
17
18func (s *Storage) QueryEvents(ctx context.Context, filters []*pb.Filter, opts *QueryOptions) ([]*pb.Event, error) {
19 if len(filters) == 0 {
20 return nil, nil
21 }
22
23 if opts == nil {
24 opts = &QueryOptions{Limit: 100}
25 }
26
27 query, args := buildQuery(filters, opts)
28
29 rows, err := s.db.QueryContext(ctx, query, args...)
30 if err != nil {
31 return nil, fmt.Errorf("query failed: %w", err)
32 }
33 defer rows.Close()
34
35 var events []*pb.Event
36 for rows.Next() {
37 var eventBytes []byte
38 var compressedJSON []byte
39 var createdAt int64
40
41 if opts.IncludeCanonical {
42 if err := rows.Scan(&eventBytes, &compressedJSON, &createdAt); err != nil {
43 return nil, fmt.Errorf("scan failed: %w", err)
44 }
45 } else {
46 if err := rows.Scan(&eventBytes, &createdAt); err != nil {
47 return nil, fmt.Errorf("scan failed: %w", err)
48 }
49 }
50
51 event := &pb.Event{}
52 if err := proto.Unmarshal(eventBytes, event); err != nil {
53 return nil, fmt.Errorf("unmarshal failed: %w", err)
54 }
55
56 if opts.IncludeCanonical && compressedJSON != nil {
57 canonicalJSON, err := decompressJSON(compressedJSON)
58 if err != nil {
59 return nil, fmt.Errorf("decompress failed: %w", err)
60 }
61 event.CanonicalJson = canonicalJSON
62 }
63
64 events = append(events, event)
65 }
66
67 if err := rows.Err(); err != nil {
68 return nil, fmt.Errorf("rows iteration failed: %w", err)
69 }
70
71 return events, nil
72}
73
74func buildQuery(filters []*pb.Filter, opts *QueryOptions) (string, []interface{}) {
75 selectFields := "event_data, created_at"
76 if opts.IncludeCanonical {
77 selectFields = "event_data, canonical_json, created_at"
78 }
79
80 var unions []string
81 var args []interface{}
82
83 for _, filter := range filters {
84 clause, filterArgs := buildWhereClause(filter)
85 subQuery := fmt.Sprintf("SELECT %s FROM events WHERE deleted = 0 AND (%s)", selectFields, clause)
86 unions = append(unions, subQuery)
87 args = append(args, filterArgs...)
88 }
89
90 query := strings.Join(unions, " UNION ")
91 query += " ORDER BY created_at DESC"
92
93 if opts.Limit > 0 {
94 query += fmt.Sprintf(" LIMIT %d", opts.Limit)
95 }
96
97 return query, args
98}
99
100func buildWhereClause(filter *pb.Filter) (string, []interface{}) {
101 var conditions []string
102 var args []interface{}
103
104 if len(filter.Ids) > 0 {
105 conditions = append(conditions, buildPrefixCondition("id", filter.Ids, &args))
106 }
107
108 if len(filter.Authors) > 0 {
109 conditions = append(conditions, buildPrefixCondition("pubkey", filter.Authors, &args))
110 }
111
112 if len(filter.Kinds) > 0 {
113 placeholders := make([]string, len(filter.Kinds))
114 for i, kind := range filter.Kinds {
115 placeholders[i] = "?"
116 args = append(args, kind)
117 }
118 conditions = append(conditions, fmt.Sprintf("kind IN (%s)", strings.Join(placeholders, ",")))
119 }
120
121 if filter.Since != nil && *filter.Since > 0 {
122 conditions = append(conditions, "created_at >= ?")
123 args = append(args, *filter.Since)
124 }
125
126 if filter.Until != nil && *filter.Until > 0 {
127 conditions = append(conditions, "created_at <= ?")
128 args = append(args, *filter.Until)
129 }
130
131 if len(filter.ETags) > 0 {
132 conditions = append(conditions, buildTagCondition("e", filter.ETags, &args))
133 }
134
135 if len(filter.PTags) > 0 {
136 conditions = append(conditions, buildTagCondition("p", filter.PTags, &args))
137 }
138
139 for tagName, tagFilter := range filter.TagFilters {
140 if len(tagFilter.Values) > 0 {
141 conditions = append(conditions, buildTagCondition(tagName, tagFilter.Values, &args))
142 }
143 }
144
145 if len(conditions) == 0 {
146 return "1=1", args
147 }
148
149 return strings.Join(conditions, " AND "), args
150}
151
152func buildPrefixCondition(column string, values []string, args *[]interface{}) string {
153 var orConditions []string
154
155 for _, val := range values {
156 if len(val) == 64 {
157 orConditions = append(orConditions, column+" = ?")
158 *args = append(*args, val)
159 } else {
160 orConditions = append(orConditions, column+" LIKE ?")
161 *args = append(*args, val+"%")
162 }
163 }
164
165 if len(orConditions) == 1 {
166 return orConditions[0]
167 }
168
169 return "(" + strings.Join(orConditions, " OR ") + ")"
170}
171
172func buildTagCondition(tagName string, values []string, args *[]interface{}) string {
173 var orConditions []string
174
175 for _, val := range values {
176 orConditions = append(orConditions, "EXISTS (SELECT 1 FROM json_each(tags) WHERE json_extract(value, '$[0]') = ? AND json_extract(value, '$[1]') = ?)")
177 *args = append(*args, tagName, val)
178 }
179
180 if len(orConditions) == 1 {
181 return orConditions[0]
182 }
183
184 return "(" + strings.Join(orConditions, " OR ") + ")"
185}
diff --git a/internal/storage/query_test.go b/internal/storage/query_test.go
new file mode 100644
index 0000000..b1f2fc7
--- /dev/null
+++ b/internal/storage/query_test.go
@@ -0,0 +1,437 @@
1package storage
2
3import (
4 "context"
5 "testing"
6
7 pb "northwest.io/nostr-grpc/api/nostr/v1"
8)
9
10func setupTestEvents(t *testing.T, store *Storage) {
11 ctx := context.Background()
12
13 events := []*EventData{
14 {
15 Event: &pb.Event{
16 Id: "aaaa1111",
17 Pubkey: "pubkey1",
18 CreatedAt: 1000,
19 Kind: 1,
20 Tags: []*pb.Tag{{Values: []string{"e", "event123"}}},
21 Content: "test event 1",
22 Sig: "sig1",
23 },
24 CanonicalJSON: []byte(`[0,"pubkey1",1000,1,[["e","event123"]],"test event 1"]`),
25 },
26 {
27 Event: &pb.Event{
28 Id: "bbbb2222",
29 Pubkey: "pubkey2",
30 CreatedAt: 2000,
31 Kind: 1,
32 Tags: []*pb.Tag{{Values: []string{"p", "pubkey1"}}},
33 Content: "test event 2",
34 Sig: "sig2",
35 },
36 CanonicalJSON: []byte(`[0,"pubkey2",2000,1,[["p","pubkey1"]],"test event 2"]`),
37 },
38 {
39 Event: &pb.Event{
40 Id: "cccc3333",
41 Pubkey: "pubkey1",
42 CreatedAt: 3000,
43 Kind: 3,
44 Tags: []*pb.Tag{},
45 Content: "test event 3",
46 Sig: "sig3",
47 },
48 CanonicalJSON: []byte(`[0,"pubkey1",3000,3,[],"test event 3"]`),
49 },
50 {
51 Event: &pb.Event{
52 Id: "dddd4444",
53 Pubkey: "pubkey3",
54 CreatedAt: 4000,
55 Kind: 1,
56 Tags: []*pb.Tag{
57 {Values: []string{"e", "event123"}},
58 {Values: []string{"p", "pubkey2"}},
59 },
60 Content: "test event 4",
61 Sig: "sig4",
62 },
63 CanonicalJSON: []byte(`[0,"pubkey3",4000,1,[["e","event123"],["p","pubkey2"]],"test event 4"]`),
64 },
65 }
66
67 for _, data := range events {
68 if err := store.StoreEvent(ctx, data); err != nil {
69 t.Fatalf("failed to store event %s: %v", data.Event.Id, err)
70 }
71 }
72}
73
74func TestQueryEventsByID(t *testing.T) {
75 store, err := New(":memory:")
76 if err != nil {
77 t.Fatalf("failed to create storage: %v", err)
78 }
79 defer store.Close()
80
81 setupTestEvents(t, store)
82 ctx := context.Background()
83
84 filters := []*pb.Filter{
85 {Ids: []string{"aaaa1111"}},
86 }
87
88 events, err := store.QueryEvents(ctx, filters, nil)
89 if err != nil {
90 t.Fatalf("query failed: %v", err)
91 }
92
93 if len(events) != 1 {
94 t.Fatalf("expected 1 event, got %d", len(events))
95 }
96
97 if events[0].Id != "aaaa1111" {
98 t.Errorf("expected ID aaaa1111, got %s", events[0].Id)
99 }
100}
101
102func TestQueryEventsByIDPrefix(t *testing.T) {
103 store, err := New(":memory:")
104 if err != nil {
105 t.Fatalf("failed to create storage: %v", err)
106 }
107 defer store.Close()
108
109 setupTestEvents(t, store)
110 ctx := context.Background()
111
112 filters := []*pb.Filter{
113 {Ids: []string{"aaaa"}},
114 }
115
116 events, err := store.QueryEvents(ctx, filters, nil)
117 if err != nil {
118 t.Fatalf("query failed: %v", err)
119 }
120
121 if len(events) != 1 {
122 t.Fatalf("expected 1 event, got %d", len(events))
123 }
124
125 if events[0].Id != "aaaa1111" {
126 t.Errorf("expected ID aaaa1111, got %s", events[0].Id)
127 }
128}
129
130func TestQueryEventsByAuthor(t *testing.T) {
131 store, err := New(":memory:")
132 if err != nil {
133 t.Fatalf("failed to create storage: %v", err)
134 }
135 defer store.Close()
136
137 setupTestEvents(t, store)
138 ctx := context.Background()
139
140 filters := []*pb.Filter{
141 {Authors: []string{"pubkey1"}},
142 }
143
144 events, err := store.QueryEvents(ctx, filters, nil)
145 if err != nil {
146 t.Fatalf("query failed: %v", err)
147 }
148
149 if len(events) != 2 {
150 t.Fatalf("expected 2 events, got %d", len(events))
151 }
152
153 for _, event := range events {
154 if event.Pubkey != "pubkey1" {
155 t.Errorf("expected pubkey pubkey1, got %s", event.Pubkey)
156 }
157 }
158}
159
160func TestQueryEventsByKind(t *testing.T) {
161 store, err := New(":memory:")
162 if err != nil {
163 t.Fatalf("failed to create storage: %v", err)
164 }
165 defer store.Close()
166
167 setupTestEvents(t, store)
168 ctx := context.Background()
169
170 filters := []*pb.Filter{
171 {Kinds: []int32{1}},
172 }
173
174 events, err := store.QueryEvents(ctx, filters, nil)
175 if err != nil {
176 t.Fatalf("query failed: %v", err)
177 }
178
179 if len(events) != 3 {
180 t.Fatalf("expected 3 events, got %d", len(events))
181 }
182
183 for _, event := range events {
184 if event.Kind != 1 {
185 t.Errorf("expected kind 1, got %d", event.Kind)
186 }
187 }
188}
189
190func TestQueryEventsByETag(t *testing.T) {
191 store, err := New(":memory:")
192 if err != nil {
193 t.Fatalf("failed to create storage: %v", err)
194 }
195 defer store.Close()
196
197 setupTestEvents(t, store)
198 ctx := context.Background()
199
200 filters := []*pb.Filter{
201 {ETags: []string{"event123"}},
202 }
203
204 events, err := store.QueryEvents(ctx, filters, nil)
205 if err != nil {
206 t.Fatalf("query failed: %v", err)
207 }
208
209 if len(events) != 2 {
210 t.Fatalf("expected 2 events, got %d", len(events))
211 }
212}
213
214func TestQueryEventsByPTag(t *testing.T) {
215 store, err := New(":memory:")
216 if err != nil {
217 t.Fatalf("failed to create storage: %v", err)
218 }
219 defer store.Close()
220
221 setupTestEvents(t, store)
222 ctx := context.Background()
223
224 filters := []*pb.Filter{
225 {PTags: []string{"pubkey1"}},
226 }
227
228 events, err := store.QueryEvents(ctx, filters, nil)
229 if err != nil {
230 t.Fatalf("query failed: %v", err)
231 }
232
233 if len(events) != 1 {
234 t.Fatalf("expected 1 event, got %d", len(events))
235 }
236
237 if events[0].Id != "bbbb2222" {
238 t.Errorf("expected ID bbbb2222, got %s", events[0].Id)
239 }
240}
241
242func TestQueryEventsSince(t *testing.T) {
243 store, err := New(":memory:")
244 if err != nil {
245 t.Fatalf("failed to create storage: %v", err)
246 }
247 defer store.Close()
248
249 setupTestEvents(t, store)
250 ctx := context.Background()
251
252 since := int64(2500)
253 filters := []*pb.Filter{
254 {Since: &since},
255 }
256
257 events, err := store.QueryEvents(ctx, filters, nil)
258 if err != nil {
259 t.Fatalf("query failed: %v", err)
260 }
261
262 if len(events) != 2 {
263 t.Fatalf("expected 2 events, got %d", len(events))
264 }
265
266 for _, event := range events {
267 if event.CreatedAt < since {
268 t.Errorf("event %s has timestamp %d, expected >= %d", event.Id, event.CreatedAt, since)
269 }
270 }
271}
272
273func TestQueryEventsUntil(t *testing.T) {
274 store, err := New(":memory:")
275 if err != nil {
276 t.Fatalf("failed to create storage: %v", err)
277 }
278 defer store.Close()
279
280 setupTestEvents(t, store)
281 ctx := context.Background()
282
283 until := int64(2500)
284 filters := []*pb.Filter{
285 {Until: &until},
286 }
287
288 events, err := store.QueryEvents(ctx, filters, nil)
289 if err != nil {
290 t.Fatalf("query failed: %v", err)
291 }
292
293 if len(events) != 2 {
294 t.Fatalf("expected 2 events, got %d", len(events))
295 }
296
297 for _, event := range events {
298 if event.CreatedAt > until {
299 t.Errorf("event %s has timestamp %d, expected <= %d", event.Id, event.CreatedAt, until)
300 }
301 }
302}
303
304func TestQueryEventsWithLimit(t *testing.T) {
305 store, err := New(":memory:")
306 if err != nil {
307 t.Fatalf("failed to create storage: %v", err)
308 }
309 defer store.Close()
310
311 setupTestEvents(t, store)
312 ctx := context.Background()
313
314 filters := []*pb.Filter{{}}
315 opts := &QueryOptions{Limit: 2}
316
317 events, err := store.QueryEvents(ctx, filters, opts)
318 if err != nil {
319 t.Fatalf("query failed: %v", err)
320 }
321
322 if len(events) != 2 {
323 t.Fatalf("expected 2 events due to limit, got %d", len(events))
324 }
325}
326
327func TestQueryEventsWithCanonical(t *testing.T) {
328 store, err := New(":memory:")
329 if err != nil {
330 t.Fatalf("failed to create storage: %v", err)
331 }
332 defer store.Close()
333
334 setupTestEvents(t, store)
335 ctx := context.Background()
336
337 filters := []*pb.Filter{
338 {Ids: []string{"aaaa1111"}},
339 }
340 opts := &QueryOptions{IncludeCanonical: true}
341
342 events, err := store.QueryEvents(ctx, filters, opts)
343 if err != nil {
344 t.Fatalf("query failed: %v", err)
345 }
346
347 if len(events) != 1 {
348 t.Fatalf("expected 1 event, got %d", len(events))
349 }
350
351 if len(events[0].CanonicalJson) == 0 {
352 t.Error("expected canonical JSON to be populated")
353 }
354}
355
356func TestQueryMultipleFilters(t *testing.T) {
357 store, err := New(":memory:")
358 if err != nil {
359 t.Fatalf("failed to create storage: %v", err)
360 }
361 defer store.Close()
362
363 setupTestEvents(t, store)
364 ctx := context.Background()
365
366 filters := []*pb.Filter{
367 {Ids: []string{"aaaa1111"}},
368 {Kinds: []int32{3}},
369 }
370
371 events, err := store.QueryEvents(ctx, filters, nil)
372 if err != nil {
373 t.Fatalf("query failed: %v", err)
374 }
375
376 if len(events) != 2 {
377 t.Fatalf("expected 2 events (UNION), got %d", len(events))
378 }
379}
380
381func TestQueryEventsOrdering(t *testing.T) {
382 store, err := New(":memory:")
383 if err != nil {
384 t.Fatalf("failed to create storage: %v", err)
385 }
386 defer store.Close()
387
388 setupTestEvents(t, store)
389 ctx := context.Background()
390
391 filters := []*pb.Filter{{}}
392
393 events, err := store.QueryEvents(ctx, filters, nil)
394 if err != nil {
395 t.Fatalf("query failed: %v", err)
396 }
397
398 for i := 1; i < len(events); i++ {
399 if events[i].CreatedAt > events[i-1].CreatedAt {
400 t.Errorf("events not ordered by created_at DESC: %d > %d",
401 events[i].CreatedAt, events[i-1].CreatedAt)
402 }
403 }
404}
405
406func TestQueryEventsComplex(t *testing.T) {
407 store, err := New(":memory:")
408 if err != nil {
409 t.Fatalf("failed to create storage: %v", err)
410 }
411 defer store.Close()
412
413 setupTestEvents(t, store)
414 ctx := context.Background()
415
416 since := int64(1500)
417 filters := []*pb.Filter{
418 {
419 Authors: []string{"pubkey1"},
420 Kinds: []int32{1, 3},
421 Since: &since,
422 },
423 }
424
425 events, err := store.QueryEvents(ctx, filters, nil)
426 if err != nil {
427 t.Fatalf("query failed: %v", err)
428 }
429
430 if len(events) != 1 {
431 t.Fatalf("expected 1 event, got %d", len(events))
432 }
433
434 if events[0].Id != "cccc3333" {
435 t.Errorf("expected event cccc3333, got %s", events[0].Id)
436 }
437}