From 61a85baf87d89fcc09f9469a113a2ddc982b0a24 Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 08:01:02 -0700 Subject: feat: phase 2 relay implementation Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay) --- relay/storage/events.go | 192 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 relay/storage/events.go (limited to 'relay/storage/events.go') diff --git a/relay/storage/events.go b/relay/storage/events.go new file mode 100644 index 0000000..cf10097 --- /dev/null +++ b/relay/storage/events.go @@ -0,0 +1,192 @@ +package storage + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "axon" +) + +// ErrDuplicate is returned by StoreEvent when the event ID already exists. +var ErrDuplicate = errors.New("storage: duplicate event") + +// StoreEvent persists an event and its tags to the database in a single +// transaction. envelopeBytes is the verbatim msgpack representation used for +// zero-copy fanout. +func (s *Storage) StoreEvent(ctx context.Context, event *axon.Event, envelopeBytes []byte) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("storage: begin tx: %w", err) + } + defer tx.Rollback() + + _, err = tx.ExecContext(ctx, + `INSERT INTO events (id, pubkey, created_at, kind, envelope_bytes) VALUES (?, ?, ?, ?, ?)`, + event.ID, event.PubKey, event.CreatedAt, event.Kind, envelopeBytes, + ) + if err != nil { + if isDuplicateError(err) { + return ErrDuplicate + } + return fmt.Errorf("storage: insert event: %w", err) + } + + for _, tag := range event.Tags { + if len(tag.Values) == 0 { + continue + } + _, err = tx.ExecContext(ctx, + `INSERT INTO tags (event_id, name, value) VALUES (?, ?, ?)`, + event.ID, tag.Name, tag.Values[0], + ) + if err != nil { + return fmt.Errorf("storage: insert tag: %w", err) + } + } + + return tx.Commit() +} + +// ExistsByID returns true if an event with the given ID is already stored. +func (s *Storage) ExistsByID(ctx context.Context, id []byte) (bool, error) { + var n int + err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events WHERE id = ?`, id).Scan(&n) + if err != nil && err != sql.ErrNoRows { + return false, fmt.Errorf("storage: exists: %w", err) + } + return n > 0, nil +} + +// QueryEvents executes the given filters against the database using a UNION +// query and returns matching event envelope bytes in descending created_at +// order. The effective LIMIT is the minimum non-zero Limit across all filters. +func (s *Storage) QueryEvents(ctx context.Context, filters []axon.Filter) ([][]byte, error) { + if len(filters) == 0 { + return nil, nil + } + + var unions []string + var args []interface{} + var effectiveLimit int32 + + for _, f := range filters { + var filterArgs []interface{} + clause := buildWhereClause(f, &filterArgs) + sub := fmt.Sprintf( + "SELECT e.envelope_bytes, e.created_at FROM events e WHERE %s", clause) + unions = append(unions, sub) + args = append(args, filterArgs...) + if f.Limit > 0 && (effectiveLimit == 0 || f.Limit < effectiveLimit) { + effectiveLimit = f.Limit + } + } + + query := strings.Join(unions, " UNION ") + " ORDER BY created_at DESC" + if effectiveLimit > 0 { + query += fmt.Sprintf(" LIMIT %d", effectiveLimit) + } + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("storage: query: %w", err) + } + defer rows.Close() + + var results [][]byte + for rows.Next() { + var envelope []byte + var createdAt int64 + if err := rows.Scan(&envelope, &createdAt); err != nil { + return nil, fmt.Errorf("storage: scan: %w", err) + } + results = append(results, envelope) + } + return results, rows.Err() +} + +// buildWhereClause builds the SQL WHERE clause for a single filter, appending +// bind parameters to args. +func buildWhereClause(f axon.Filter, args *[]interface{}) string { + var conditions []string + + if len(f.IDs) > 0 { + conditions = append(conditions, buildBlobPrefixCondition("e.id", f.IDs, args)) + } + + if len(f.Authors) > 0 { + conditions = append(conditions, buildBlobPrefixCondition("e.pubkey", f.Authors, args)) + } + + if len(f.Kinds) > 0 { + placeholders := make([]string, len(f.Kinds)) + for i, k := range f.Kinds { + placeholders[i] = "?" + *args = append(*args, k) + } + conditions = append(conditions, "e.kind IN ("+strings.Join(placeholders, ",")+")") + } + + if f.Since != 0 { + conditions = append(conditions, "e.created_at >= ?") + *args = append(*args, f.Since) + } + + if f.Until != 0 { + conditions = append(conditions, "e.created_at <= ?") + *args = append(*args, f.Until) + } + + for _, tf := range f.Tags { + conditions = append(conditions, buildTagJoinCondition(tf, args)) + } + + if len(conditions) == 0 { + return "1=1" + } + return strings.Join(conditions, " AND ") +} + +// buildBlobPrefixCondition builds an OR condition for prefix-matching a BLOB +// column. Prefix slices of exactly 32 bytes use equality; shorter slices use +// hex(column) LIKE 'HEX%'. +func buildBlobPrefixCondition(column string, prefixes [][]byte, args *[]interface{}) string { + var orConds []string + for _, prefix := range prefixes { + if len(prefix) == 32 { + orConds = append(orConds, column+" = ?") + *args = append(*args, prefix) + } else { + hexPrefix := fmt.Sprintf("%X", prefix) + orConds = append(orConds, fmt.Sprintf("hex(%s) LIKE ?", column)) + *args = append(*args, hexPrefix+"%") + } + } + if len(orConds) == 1 { + return orConds[0] + } + return "(" + strings.Join(orConds, " OR ") + ")" +} + +// buildTagJoinCondition builds an EXISTS sub-select for a TagFilter. +func buildTagJoinCondition(tf axon.TagFilter, args *[]interface{}) string { + if len(tf.Values) == 0 { + *args = append(*args, tf.Name) + return "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ?)" + } + var orConds []string + for _, v := range tf.Values { + orConds = append(orConds, "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ? AND t.value = ?)") + *args = append(*args, tf.Name, v) + } + if len(orConds) == 1 { + return orConds[0] + } + return "(" + strings.Join(orConds, " OR ") + ")" +} + +func isDuplicateError(err error) bool { + return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") +} -- cgit v1.2.3