aboutsummaryrefslogtreecommitdiffstats
path: root/relay/storage/events.go
diff options
context:
space:
mode:
Diffstat (limited to 'relay/storage/events.go')
-rw-r--r--relay/storage/events.go192
1 files changed, 192 insertions, 0 deletions
diff --git a/relay/storage/events.go b/relay/storage/events.go
new file mode 100644
index 0000000..cf10097
--- /dev/null
+++ b/relay/storage/events.go
@@ -0,0 +1,192 @@
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9
10 "axon"
11)
12
13// ErrDuplicate is returned by StoreEvent when the event ID already exists.
14var ErrDuplicate = errors.New("storage: duplicate event")
15
16// StoreEvent persists an event and its tags to the database in a single
17// transaction. envelopeBytes is the verbatim msgpack representation used for
18// zero-copy fanout.
19func (s *Storage) StoreEvent(ctx context.Context, event *axon.Event, envelopeBytes []byte) error {
20 tx, err := s.db.BeginTx(ctx, nil)
21 if err != nil {
22 return fmt.Errorf("storage: begin tx: %w", err)
23 }
24 defer tx.Rollback()
25
26 _, err = tx.ExecContext(ctx,
27 `INSERT INTO events (id, pubkey, created_at, kind, envelope_bytes) VALUES (?, ?, ?, ?, ?)`,
28 event.ID, event.PubKey, event.CreatedAt, event.Kind, envelopeBytes,
29 )
30 if err != nil {
31 if isDuplicateError(err) {
32 return ErrDuplicate
33 }
34 return fmt.Errorf("storage: insert event: %w", err)
35 }
36
37 for _, tag := range event.Tags {
38 if len(tag.Values) == 0 {
39 continue
40 }
41 _, err = tx.ExecContext(ctx,
42 `INSERT INTO tags (event_id, name, value) VALUES (?, ?, ?)`,
43 event.ID, tag.Name, tag.Values[0],
44 )
45 if err != nil {
46 return fmt.Errorf("storage: insert tag: %w", err)
47 }
48 }
49
50 return tx.Commit()
51}
52
53// ExistsByID returns true if an event with the given ID is already stored.
54func (s *Storage) ExistsByID(ctx context.Context, id []byte) (bool, error) {
55 var n int
56 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events WHERE id = ?`, id).Scan(&n)
57 if err != nil && err != sql.ErrNoRows {
58 return false, fmt.Errorf("storage: exists: %w", err)
59 }
60 return n > 0, nil
61}
62
63// QueryEvents executes the given filters against the database using a UNION
64// query and returns matching event envelope bytes in descending created_at
65// order. The effective LIMIT is the minimum non-zero Limit across all filters.
66func (s *Storage) QueryEvents(ctx context.Context, filters []axon.Filter) ([][]byte, error) {
67 if len(filters) == 0 {
68 return nil, nil
69 }
70
71 var unions []string
72 var args []interface{}
73 var effectiveLimit int32
74
75 for _, f := range filters {
76 var filterArgs []interface{}
77 clause := buildWhereClause(f, &filterArgs)
78 sub := fmt.Sprintf(
79 "SELECT e.envelope_bytes, e.created_at FROM events e WHERE %s", clause)
80 unions = append(unions, sub)
81 args = append(args, filterArgs...)
82 if f.Limit > 0 && (effectiveLimit == 0 || f.Limit < effectiveLimit) {
83 effectiveLimit = f.Limit
84 }
85 }
86
87 query := strings.Join(unions, " UNION ") + " ORDER BY created_at DESC"
88 if effectiveLimit > 0 {
89 query += fmt.Sprintf(" LIMIT %d", effectiveLimit)
90 }
91
92 rows, err := s.db.QueryContext(ctx, query, args...)
93 if err != nil {
94 return nil, fmt.Errorf("storage: query: %w", err)
95 }
96 defer rows.Close()
97
98 var results [][]byte
99 for rows.Next() {
100 var envelope []byte
101 var createdAt int64
102 if err := rows.Scan(&envelope, &createdAt); err != nil {
103 return nil, fmt.Errorf("storage: scan: %w", err)
104 }
105 results = append(results, envelope)
106 }
107 return results, rows.Err()
108}
109
110// buildWhereClause builds the SQL WHERE clause for a single filter, appending
111// bind parameters to args.
112func buildWhereClause(f axon.Filter, args *[]interface{}) string {
113 var conditions []string
114
115 if len(f.IDs) > 0 {
116 conditions = append(conditions, buildBlobPrefixCondition("e.id", f.IDs, args))
117 }
118
119 if len(f.Authors) > 0 {
120 conditions = append(conditions, buildBlobPrefixCondition("e.pubkey", f.Authors, args))
121 }
122
123 if len(f.Kinds) > 0 {
124 placeholders := make([]string, len(f.Kinds))
125 for i, k := range f.Kinds {
126 placeholders[i] = "?"
127 *args = append(*args, k)
128 }
129 conditions = append(conditions, "e.kind IN ("+strings.Join(placeholders, ",")+")")
130 }
131
132 if f.Since != 0 {
133 conditions = append(conditions, "e.created_at >= ?")
134 *args = append(*args, f.Since)
135 }
136
137 if f.Until != 0 {
138 conditions = append(conditions, "e.created_at <= ?")
139 *args = append(*args, f.Until)
140 }
141
142 for _, tf := range f.Tags {
143 conditions = append(conditions, buildTagJoinCondition(tf, args))
144 }
145
146 if len(conditions) == 0 {
147 return "1=1"
148 }
149 return strings.Join(conditions, " AND ")
150}
151
152// buildBlobPrefixCondition builds an OR condition for prefix-matching a BLOB
153// column. Prefix slices of exactly 32 bytes use equality; shorter slices use
154// hex(column) LIKE 'HEX%'.
155func buildBlobPrefixCondition(column string, prefixes [][]byte, args *[]interface{}) string {
156 var orConds []string
157 for _, prefix := range prefixes {
158 if len(prefix) == 32 {
159 orConds = append(orConds, column+" = ?")
160 *args = append(*args, prefix)
161 } else {
162 hexPrefix := fmt.Sprintf("%X", prefix)
163 orConds = append(orConds, fmt.Sprintf("hex(%s) LIKE ?", column))
164 *args = append(*args, hexPrefix+"%")
165 }
166 }
167 if len(orConds) == 1 {
168 return orConds[0]
169 }
170 return "(" + strings.Join(orConds, " OR ") + ")"
171}
172
173// buildTagJoinCondition builds an EXISTS sub-select for a TagFilter.
174func buildTagJoinCondition(tf axon.TagFilter, args *[]interface{}) string {
175 if len(tf.Values) == 0 {
176 *args = append(*args, tf.Name)
177 return "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ?)"
178 }
179 var orConds []string
180 for _, v := range tf.Values {
181 orConds = append(orConds, "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ? AND t.value = ?)")
182 *args = append(*args, tf.Name, v)
183 }
184 if len(orConds) == 1 {
185 return orConds[0]
186 }
187 return "(" + strings.Join(orConds, " OR ") + ")"
188}
189
190func isDuplicateError(err error) bool {
191 return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed")
192}