aboutsummaryrefslogtreecommitdiffstats
path: root/relay/storage/storage.go
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-03-09 08:01:02 -0700
committerbndw <ben@bdw.to>2026-03-09 08:01:02 -0700
commit61a85baf87d89fcc09f9469a113a2ddc982b0a24 (patch)
treed8359ce5cbcbb9402ba92c617c4ebd702adf33e9 /relay/storage/storage.go
parentce684848e25fed3aabdde4ffba6d2d8c40afa030 (diff)
feat: phase 2 relay implementation
Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay)
Diffstat (limited to 'relay/storage/storage.go')
-rw-r--r--relay/storage/storage.go86
1 files changed, 86 insertions, 0 deletions
diff --git a/relay/storage/storage.go b/relay/storage/storage.go
new file mode 100644
index 0000000..95b278d
--- /dev/null
+++ b/relay/storage/storage.go
@@ -0,0 +1,86 @@
1// Package storage provides SQLite-backed event persistence for the Axon relay.
2package storage
3
4import (
5 "context"
6 "database/sql"
7 "fmt"
8
9 _ "modernc.org/sqlite"
10)
11
12// Storage wraps a SQLite database for Axon event persistence.
13type Storage struct {
14 db *sql.DB
15}
16
17// New opens (or creates) the SQLite database at dbPath, applies WAL pragmas,
18// and initialises the schema. Call Close when done.
19func New(dbPath string) (*Storage, error) {
20 db, err := sql.Open("sqlite", dbPath)
21 if err != nil {
22 return nil, fmt.Errorf("storage: open db: %w", err)
23 }
24
25 // SQLite works best with a single writer.
26 db.SetMaxOpenConns(1)
27 db.SetMaxIdleConns(1)
28 db.SetConnMaxLifetime(0)
29
30 pragmas := []string{
31 "PRAGMA journal_mode=WAL",
32 "PRAGMA synchronous=NORMAL",
33 "PRAGMA cache_size=-40960", // ~40 MB (negative = kibibytes)
34 "PRAGMA temp_store=MEMORY",
35 "PRAGMA mmap_size=268435456", // 256 MB
36 "PRAGMA page_size=4096",
37 "PRAGMA foreign_keys=ON",
38 "PRAGMA busy_timeout=5000",
39 }
40
41 for _, p := range pragmas {
42 if _, err := db.Exec(p); err != nil {
43 db.Close()
44 return nil, fmt.Errorf("storage: set pragma %q: %w", p, err)
45 }
46 }
47
48 s := &Storage{db: db}
49 if err := s.initSchema(context.Background()); err != nil {
50 db.Close()
51 return nil, fmt.Errorf("storage: init schema: %w", err)
52 }
53 return s, nil
54}
55
56// Close closes the underlying database connection.
57func (s *Storage) Close() error {
58 return s.db.Close()
59}
60
61const schema = `
62CREATE TABLE IF NOT EXISTS events (
63 id BLOB PRIMARY KEY,
64 pubkey BLOB NOT NULL,
65 created_at INTEGER NOT NULL,
66 kind INTEGER NOT NULL,
67 envelope_bytes BLOB NOT NULL
68) STRICT;
69
70CREATE TABLE IF NOT EXISTS tags (
71 event_id BLOB NOT NULL REFERENCES events(id),
72 name TEXT NOT NULL,
73 value TEXT NOT NULL
74) STRICT;
75
76CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey, created_at DESC);
77CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind, created_at DESC);
78CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at DESC);
79CREATE INDEX IF NOT EXISTS idx_tags_name_value ON tags(name, value);
80CREATE INDEX IF NOT EXISTS idx_tags_event_id ON tags(event_id);
81`
82
83func (s *Storage) initSchema(ctx context.Context) error {
84 _, err := s.db.ExecContext(ctx, schema)
85 return err
86}