package storage import ( "context" "database/sql" "errors" "fmt" "strings" "code.northwest.io/axon" ) // ErrDuplicate is returned by StoreEvent when the event ID already exists. var ErrDuplicate = errors.New("storage: duplicate event") // StoreEvent persists an event and its tags to the database in a single // transaction. envelopeBytes is the verbatim msgpack representation used for // zero-copy fanout. func (s *Storage) StoreEvent(ctx context.Context, event *axon.Event, envelopeBytes []byte) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("storage: begin tx: %w", err) } defer tx.Rollback() _, err = tx.ExecContext(ctx, `INSERT INTO events (id, pubkey, created_at, kind, envelope_bytes) VALUES (?, ?, ?, ?, ?)`, event.ID, event.PubKey, event.CreatedAt, event.Kind, envelopeBytes, ) if err != nil { if isDuplicateError(err) { return ErrDuplicate } return fmt.Errorf("storage: insert event: %w", err) } for _, tag := range event.Tags { if len(tag.Values) == 0 { continue } _, err = tx.ExecContext(ctx, `INSERT INTO tags (event_id, name, value) VALUES (?, ?, ?)`, event.ID, tag.Name, tag.Values[0], ) if err != nil { return fmt.Errorf("storage: insert tag: %w", err) } } return tx.Commit() } // ExistsByID returns true if an event with the given ID is already stored. func (s *Storage) ExistsByID(ctx context.Context, id []byte) (bool, error) { var n int err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events WHERE id = ?`, id).Scan(&n) if err != nil && err != sql.ErrNoRows { return false, fmt.Errorf("storage: exists: %w", err) } return n > 0, nil } // QueryEvents executes the given filters against the database using a UNION // query and returns matching event envelope bytes in descending created_at // order. The effective LIMIT is the minimum non-zero Limit across all filters. func (s *Storage) QueryEvents(ctx context.Context, filters []axon.Filter) ([][]byte, error) { if len(filters) == 0 { return nil, nil } var unions []string var args []interface{} var effectiveLimit int32 for _, f := range filters { var filterArgs []interface{} clause := buildWhereClause(f, &filterArgs) sub := fmt.Sprintf( "SELECT e.envelope_bytes, e.created_at FROM events e WHERE %s", clause) unions = append(unions, sub) args = append(args, filterArgs...) if f.Limit > 0 && (effectiveLimit == 0 || f.Limit < effectiveLimit) { effectiveLimit = f.Limit } } query := strings.Join(unions, " UNION ") + " ORDER BY created_at DESC" if effectiveLimit > 0 { query += fmt.Sprintf(" LIMIT %d", effectiveLimit) } rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, fmt.Errorf("storage: query: %w", err) } defer rows.Close() var results [][]byte for rows.Next() { var envelope []byte var createdAt int64 if err := rows.Scan(&envelope, &createdAt); err != nil { return nil, fmt.Errorf("storage: scan: %w", err) } results = append(results, envelope) } return results, rows.Err() } // buildWhereClause builds the SQL WHERE clause for a single filter, appending // bind parameters to args. func buildWhereClause(f axon.Filter, args *[]interface{}) string { var conditions []string if len(f.IDs) > 0 { conditions = append(conditions, buildBlobPrefixCondition("e.id", f.IDs, args)) } if len(f.Authors) > 0 { conditions = append(conditions, buildBlobPrefixCondition("e.pubkey", f.Authors, args)) } if len(f.Kinds) > 0 { placeholders := make([]string, len(f.Kinds)) for i, k := range f.Kinds { placeholders[i] = "?" *args = append(*args, k) } conditions = append(conditions, "e.kind IN ("+strings.Join(placeholders, ",")+")") } if f.Since != 0 { conditions = append(conditions, "e.created_at >= ?") *args = append(*args, f.Since) } if f.Until != 0 { conditions = append(conditions, "e.created_at <= ?") *args = append(*args, f.Until) } for _, tf := range f.Tags { conditions = append(conditions, buildTagJoinCondition(tf, args)) } if len(conditions) == 0 { return "1=1" } return strings.Join(conditions, " AND ") } // buildBlobPrefixCondition builds an OR condition for prefix-matching a BLOB // column. Prefix slices of exactly 32 bytes use equality; shorter slices use // hex(column) LIKE 'HEX%'. func buildBlobPrefixCondition(column string, prefixes [][]byte, args *[]interface{}) string { var orConds []string for _, prefix := range prefixes { if len(prefix) == 32 { orConds = append(orConds, column+" = ?") *args = append(*args, prefix) } else { hexPrefix := fmt.Sprintf("%X", prefix) orConds = append(orConds, fmt.Sprintf("hex(%s) LIKE ?", column)) *args = append(*args, hexPrefix+"%") } } if len(orConds) == 1 { return orConds[0] } return "(" + strings.Join(orConds, " OR ") + ")" } // buildTagJoinCondition builds an EXISTS sub-select for a TagFilter. func buildTagJoinCondition(tf axon.TagFilter, args *[]interface{}) string { if len(tf.Values) == 0 { *args = append(*args, tf.Name) return "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ?)" } var orConds []string for _, v := range tf.Values { orConds = append(orConds, "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ? AND t.value = ?)") *args = append(*args, tf.Name, v) } if len(orConds) == 1 { return orConds[0] } return "(" + strings.Join(orConds, " OR ") + ")" } func isDuplicateError(err error) bool { return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") }