aboutsummaryrefslogtreecommitdiffstats
path: root/relay/storage
diff options
context:
space:
mode:
Diffstat (limited to 'relay/storage')
-rw-r--r--relay/storage/events.go192
-rw-r--r--relay/storage/storage.go86
2 files changed, 278 insertions, 0 deletions
diff --git a/relay/storage/events.go b/relay/storage/events.go
new file mode 100644
index 0000000..cf10097
--- /dev/null
+++ b/relay/storage/events.go
@@ -0,0 +1,192 @@
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9
10 "axon"
11)
12
13// ErrDuplicate is returned by StoreEvent when the event ID already exists.
14var ErrDuplicate = errors.New("storage: duplicate event")
15
16// StoreEvent persists an event and its tags to the database in a single
17// transaction. envelopeBytes is the verbatim msgpack representation used for
18// zero-copy fanout.
19func (s *Storage) StoreEvent(ctx context.Context, event *axon.Event, envelopeBytes []byte) error {
20 tx, err := s.db.BeginTx(ctx, nil)
21 if err != nil {
22 return fmt.Errorf("storage: begin tx: %w", err)
23 }
24 defer tx.Rollback()
25
26 _, err = tx.ExecContext(ctx,
27 `INSERT INTO events (id, pubkey, created_at, kind, envelope_bytes) VALUES (?, ?, ?, ?, ?)`,
28 event.ID, event.PubKey, event.CreatedAt, event.Kind, envelopeBytes,
29 )
30 if err != nil {
31 if isDuplicateError(err) {
32 return ErrDuplicate
33 }
34 return fmt.Errorf("storage: insert event: %w", err)
35 }
36
37 for _, tag := range event.Tags {
38 if len(tag.Values) == 0 {
39 continue
40 }
41 _, err = tx.ExecContext(ctx,
42 `INSERT INTO tags (event_id, name, value) VALUES (?, ?, ?)`,
43 event.ID, tag.Name, tag.Values[0],
44 )
45 if err != nil {
46 return fmt.Errorf("storage: insert tag: %w", err)
47 }
48 }
49
50 return tx.Commit()
51}
52
53// ExistsByID returns true if an event with the given ID is already stored.
54func (s *Storage) ExistsByID(ctx context.Context, id []byte) (bool, error) {
55 var n int
56 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events WHERE id = ?`, id).Scan(&n)
57 if err != nil && err != sql.ErrNoRows {
58 return false, fmt.Errorf("storage: exists: %w", err)
59 }
60 return n > 0, nil
61}
62
63// QueryEvents executes the given filters against the database using a UNION
64// query and returns matching event envelope bytes in descending created_at
65// order. The effective LIMIT is the minimum non-zero Limit across all filters.
66func (s *Storage) QueryEvents(ctx context.Context, filters []axon.Filter) ([][]byte, error) {
67 if len(filters) == 0 {
68 return nil, nil
69 }
70
71 var unions []string
72 var args []interface{}
73 var effectiveLimit int32
74
75 for _, f := range filters {
76 var filterArgs []interface{}
77 clause := buildWhereClause(f, &filterArgs)
78 sub := fmt.Sprintf(
79 "SELECT e.envelope_bytes, e.created_at FROM events e WHERE %s", clause)
80 unions = append(unions, sub)
81 args = append(args, filterArgs...)
82 if f.Limit > 0 && (effectiveLimit == 0 || f.Limit < effectiveLimit) {
83 effectiveLimit = f.Limit
84 }
85 }
86
87 query := strings.Join(unions, " UNION ") + " ORDER BY created_at DESC"
88 if effectiveLimit > 0 {
89 query += fmt.Sprintf(" LIMIT %d", effectiveLimit)
90 }
91
92 rows, err := s.db.QueryContext(ctx, query, args...)
93 if err != nil {
94 return nil, fmt.Errorf("storage: query: %w", err)
95 }
96 defer rows.Close()
97
98 var results [][]byte
99 for rows.Next() {
100 var envelope []byte
101 var createdAt int64
102 if err := rows.Scan(&envelope, &createdAt); err != nil {
103 return nil, fmt.Errorf("storage: scan: %w", err)
104 }
105 results = append(results, envelope)
106 }
107 return results, rows.Err()
108}
109
110// buildWhereClause builds the SQL WHERE clause for a single filter, appending
111// bind parameters to args.
112func buildWhereClause(f axon.Filter, args *[]interface{}) string {
113 var conditions []string
114
115 if len(f.IDs) > 0 {
116 conditions = append(conditions, buildBlobPrefixCondition("e.id", f.IDs, args))
117 }
118
119 if len(f.Authors) > 0 {
120 conditions = append(conditions, buildBlobPrefixCondition("e.pubkey", f.Authors, args))
121 }
122
123 if len(f.Kinds) > 0 {
124 placeholders := make([]string, len(f.Kinds))
125 for i, k := range f.Kinds {
126 placeholders[i] = "?"
127 *args = append(*args, k)
128 }
129 conditions = append(conditions, "e.kind IN ("+strings.Join(placeholders, ",")+")")
130 }
131
132 if f.Since != 0 {
133 conditions = append(conditions, "e.created_at >= ?")
134 *args = append(*args, f.Since)
135 }
136
137 if f.Until != 0 {
138 conditions = append(conditions, "e.created_at <= ?")
139 *args = append(*args, f.Until)
140 }
141
142 for _, tf := range f.Tags {
143 conditions = append(conditions, buildTagJoinCondition(tf, args))
144 }
145
146 if len(conditions) == 0 {
147 return "1=1"
148 }
149 return strings.Join(conditions, " AND ")
150}
151
152// buildBlobPrefixCondition builds an OR condition for prefix-matching a BLOB
153// column. Prefix slices of exactly 32 bytes use equality; shorter slices use
154// hex(column) LIKE 'HEX%'.
155func buildBlobPrefixCondition(column string, prefixes [][]byte, args *[]interface{}) string {
156 var orConds []string
157 for _, prefix := range prefixes {
158 if len(prefix) == 32 {
159 orConds = append(orConds, column+" = ?")
160 *args = append(*args, prefix)
161 } else {
162 hexPrefix := fmt.Sprintf("%X", prefix)
163 orConds = append(orConds, fmt.Sprintf("hex(%s) LIKE ?", column))
164 *args = append(*args, hexPrefix+"%")
165 }
166 }
167 if len(orConds) == 1 {
168 return orConds[0]
169 }
170 return "(" + strings.Join(orConds, " OR ") + ")"
171}
172
173// buildTagJoinCondition builds an EXISTS sub-select for a TagFilter.
174func buildTagJoinCondition(tf axon.TagFilter, args *[]interface{}) string {
175 if len(tf.Values) == 0 {
176 *args = append(*args, tf.Name)
177 return "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ?)"
178 }
179 var orConds []string
180 for _, v := range tf.Values {
181 orConds = append(orConds, "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ? AND t.value = ?)")
182 *args = append(*args, tf.Name, v)
183 }
184 if len(orConds) == 1 {
185 return orConds[0]
186 }
187 return "(" + strings.Join(orConds, " OR ") + ")"
188}
189
190func isDuplicateError(err error) bool {
191 return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed")
192}
diff --git a/relay/storage/storage.go b/relay/storage/storage.go
new file mode 100644
index 0000000..95b278d
--- /dev/null
+++ b/relay/storage/storage.go
@@ -0,0 +1,86 @@
1// Package storage provides SQLite-backed event persistence for the Axon relay.
2package storage
3
4import (
5 "context"
6 "database/sql"
7 "fmt"
8
9 _ "modernc.org/sqlite"
10)
11
12// Storage wraps a SQLite database for Axon event persistence.
13type Storage struct {
14 db *sql.DB
15}
16
17// New opens (or creates) the SQLite database at dbPath, applies WAL pragmas,
18// and initialises the schema. Call Close when done.
19func New(dbPath string) (*Storage, error) {
20 db, err := sql.Open("sqlite", dbPath)
21 if err != nil {
22 return nil, fmt.Errorf("storage: open db: %w", err)
23 }
24
25 // SQLite works best with a single writer.
26 db.SetMaxOpenConns(1)
27 db.SetMaxIdleConns(1)
28 db.SetConnMaxLifetime(0)
29
30 pragmas := []string{
31 "PRAGMA journal_mode=WAL",
32 "PRAGMA synchronous=NORMAL",
33 "PRAGMA cache_size=-40960", // ~40 MB (negative = kibibytes)
34 "PRAGMA temp_store=MEMORY",
35 "PRAGMA mmap_size=268435456", // 256 MB
36 "PRAGMA page_size=4096",
37 "PRAGMA foreign_keys=ON",
38 "PRAGMA busy_timeout=5000",
39 }
40
41 for _, p := range pragmas {
42 if _, err := db.Exec(p); err != nil {
43 db.Close()
44 return nil, fmt.Errorf("storage: set pragma %q: %w", p, err)
45 }
46 }
47
48 s := &Storage{db: db}
49 if err := s.initSchema(context.Background()); err != nil {
50 db.Close()
51 return nil, fmt.Errorf("storage: init schema: %w", err)
52 }
53 return s, nil
54}
55
56// Close closes the underlying database connection.
57func (s *Storage) Close() error {
58 return s.db.Close()
59}
60
61const schema = `
62CREATE TABLE IF NOT EXISTS events (
63 id BLOB PRIMARY KEY,
64 pubkey BLOB NOT NULL,
65 created_at INTEGER NOT NULL,
66 kind INTEGER NOT NULL,
67 envelope_bytes BLOB NOT NULL
68) STRICT;
69
70CREATE TABLE IF NOT EXISTS tags (
71 event_id BLOB NOT NULL REFERENCES events(id),
72 name TEXT NOT NULL,
73 value TEXT NOT NULL
74) STRICT;
75
76CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey, created_at DESC);
77CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind, created_at DESC);
78CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at DESC);
79CREATE INDEX IF NOT EXISTS idx_tags_name_value ON tags(name, value);
80CREATE INDEX IF NOT EXISTS idx_tags_event_id ON tags(event_id);
81`
82
83func (s *Storage) initSchema(ctx context.Context) error {
84 _, err := s.db.ExecContext(ctx, schema)
85 return err
86}