From 5fcf5bd1fa5b2e707cea82c4652ea65c3c113c1a Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:43:37 -0800 Subject: feat: add query layer with Nostr filter to SQL conversion Query implementation: - QueryEvents method with filter support - Full NIP-01 filter support (ids, authors, kinds, tags, since, until, limit) - ID and pubkey prefix matching - Tag filtering using SQLite JSON functions - Multiple filter UNION support - DESC ordering by created_at - Optional canonical JSON inclusion 23 tests passing, 1322 total lines --- internal/storage/query.go | 185 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 internal/storage/query.go (limited to 'internal/storage/query.go') diff --git a/internal/storage/query.go b/internal/storage/query.go new file mode 100644 index 0000000..29a2a4c --- /dev/null +++ b/internal/storage/query.go @@ -0,0 +1,185 @@ +package storage + +import ( + "context" + "fmt" + "strings" + + "google.golang.org/protobuf/proto" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +type QueryOptions struct { + IncludeCanonical bool + Limit int32 +} + +func (s *Storage) QueryEvents(ctx context.Context, filters []*pb.Filter, opts *QueryOptions) ([]*pb.Event, error) { + if len(filters) == 0 { + return nil, nil + } + + if opts == nil { + opts = &QueryOptions{Limit: 100} + } + + query, args := buildQuery(filters, opts) + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + defer rows.Close() + + var events []*pb.Event + for rows.Next() { + var eventBytes []byte + var compressedJSON []byte + var createdAt int64 + + if opts.IncludeCanonical { + if err := rows.Scan(&eventBytes, &compressedJSON, &createdAt); err != nil { + return nil, fmt.Errorf("scan failed: %w", err) + } + } else { + if err := rows.Scan(&eventBytes, &createdAt); err != nil { + return nil, fmt.Errorf("scan failed: %w", err) + } + } + + event := &pb.Event{} + if err := proto.Unmarshal(eventBytes, event); err != nil { + return nil, fmt.Errorf("unmarshal failed: %w", err) + } + + if opts.IncludeCanonical && compressedJSON != nil { + canonicalJSON, err := decompressJSON(compressedJSON) + if err != nil { + return nil, fmt.Errorf("decompress failed: %w", err) + } + event.CanonicalJson = canonicalJSON + } + + events = append(events, event) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows iteration failed: %w", err) + } + + return events, nil +} + +func buildQuery(filters []*pb.Filter, opts *QueryOptions) (string, []interface{}) { + selectFields := "event_data, created_at" + if opts.IncludeCanonical { + selectFields = "event_data, canonical_json, created_at" + } + + var unions []string + var args []interface{} + + for _, filter := range filters { + clause, filterArgs := buildWhereClause(filter) + subQuery := fmt.Sprintf("SELECT %s FROM events WHERE deleted = 0 AND (%s)", selectFields, clause) + unions = append(unions, subQuery) + args = append(args, filterArgs...) + } + + query := strings.Join(unions, " UNION ") + query += " ORDER BY created_at DESC" + + if opts.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", opts.Limit) + } + + return query, args +} + +func buildWhereClause(filter *pb.Filter) (string, []interface{}) { + var conditions []string + var args []interface{} + + if len(filter.Ids) > 0 { + conditions = append(conditions, buildPrefixCondition("id", filter.Ids, &args)) + } + + if len(filter.Authors) > 0 { + conditions = append(conditions, buildPrefixCondition("pubkey", filter.Authors, &args)) + } + + if len(filter.Kinds) > 0 { + placeholders := make([]string, len(filter.Kinds)) + for i, kind := range filter.Kinds { + placeholders[i] = "?" + args = append(args, kind) + } + conditions = append(conditions, fmt.Sprintf("kind IN (%s)", strings.Join(placeholders, ","))) + } + + if filter.Since != nil && *filter.Since > 0 { + conditions = append(conditions, "created_at >= ?") + args = append(args, *filter.Since) + } + + if filter.Until != nil && *filter.Until > 0 { + conditions = append(conditions, "created_at <= ?") + args = append(args, *filter.Until) + } + + if len(filter.ETags) > 0 { + conditions = append(conditions, buildTagCondition("e", filter.ETags, &args)) + } + + if len(filter.PTags) > 0 { + conditions = append(conditions, buildTagCondition("p", filter.PTags, &args)) + } + + for tagName, tagFilter := range filter.TagFilters { + if len(tagFilter.Values) > 0 { + conditions = append(conditions, buildTagCondition(tagName, tagFilter.Values, &args)) + } + } + + if len(conditions) == 0 { + return "1=1", args + } + + return strings.Join(conditions, " AND "), args +} + +func buildPrefixCondition(column string, values []string, args *[]interface{}) string { + var orConditions []string + + for _, val := range values { + if len(val) == 64 { + orConditions = append(orConditions, column+" = ?") + *args = append(*args, val) + } else { + orConditions = append(orConditions, column+" LIKE ?") + *args = append(*args, val+"%") + } + } + + if len(orConditions) == 1 { + return orConditions[0] + } + + return "(" + strings.Join(orConditions, " OR ") + ")" +} + +func buildTagCondition(tagName string, values []string, args *[]interface{}) string { + var orConditions []string + + for _, val := range values { + orConditions = append(orConditions, "EXISTS (SELECT 1 FROM json_each(tags) WHERE json_extract(value, '$[0]') = ? AND json_extract(value, '$[1]') = ?)") + *args = append(*args, tagName, val) + } + + if len(orConditions) == 1 { + return orConditions[0] + } + + return "(" + strings.Join(orConditions, " OR ") + ")" +} -- cgit v1.2.3