From 61a85baf87d89fcc09f9469a113a2ddc982b0a24 Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 08:01:02 -0700 Subject: feat: phase 2 relay implementation Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay) --- relay/storage/storage.go | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 relay/storage/storage.go (limited to 'relay/storage/storage.go') diff --git a/relay/storage/storage.go b/relay/storage/storage.go new file mode 100644 index 0000000..95b278d --- /dev/null +++ b/relay/storage/storage.go @@ -0,0 +1,86 @@ +// Package storage provides SQLite-backed event persistence for the Axon relay. +package storage + +import ( + "context" + "database/sql" + "fmt" + + _ "modernc.org/sqlite" +) + +// Storage wraps a SQLite database for Axon event persistence. +type Storage struct { + db *sql.DB +} + +// New opens (or creates) the SQLite database at dbPath, applies WAL pragmas, +// and initialises the schema. Call Close when done. +func New(dbPath string) (*Storage, error) { + db, err := sql.Open("sqlite", dbPath) + if err != nil { + return nil, fmt.Errorf("storage: open db: %w", err) + } + + // SQLite works best with a single writer. + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(0) + + pragmas := []string{ + "PRAGMA journal_mode=WAL", + "PRAGMA synchronous=NORMAL", + "PRAGMA cache_size=-40960", // ~40 MB (negative = kibibytes) + "PRAGMA temp_store=MEMORY", + "PRAGMA mmap_size=268435456", // 256 MB + "PRAGMA page_size=4096", + "PRAGMA foreign_keys=ON", + "PRAGMA busy_timeout=5000", + } + + for _, p := range pragmas { + if _, err := db.Exec(p); err != nil { + db.Close() + return nil, fmt.Errorf("storage: set pragma %q: %w", p, err) + } + } + + s := &Storage{db: db} + if err := s.initSchema(context.Background()); err != nil { + db.Close() + return nil, fmt.Errorf("storage: init schema: %w", err) + } + return s, nil +} + +// Close closes the underlying database connection. +func (s *Storage) Close() error { + return s.db.Close() +} + +const schema = ` +CREATE TABLE IF NOT EXISTS events ( + id BLOB PRIMARY KEY, + pubkey BLOB NOT NULL, + created_at INTEGER NOT NULL, + kind INTEGER NOT NULL, + envelope_bytes BLOB NOT NULL +) STRICT; + +CREATE TABLE IF NOT EXISTS tags ( + event_id BLOB NOT NULL REFERENCES events(id), + name TEXT NOT NULL, + value TEXT NOT NULL +) STRICT; + +CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_tags_name_value ON tags(name, value); +CREATE INDEX IF NOT EXISTS idx_tags_event_id ON tags(event_id); +` + +func (s *Storage) initSchema(ctx context.Context) error { + _, err := s.db.ExecContext(ctx, schema) + return err +} -- cgit v1.2.3