From 28d6d0ea2f86d69ad003557656466a50545fc0c9 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:58:29 -0800 Subject: feat: implement Subscribe with real-time event streaming Subscription manager: - Track active subscriptions across connections - Filter matching with full NIP-01 support - Event fan-out to matching subscribers Subscribe RPC: - Query stored events (past) - Stream them to client - Keep stream open for new events (real-time) - Auto-generate subscription ID if not provided PublishEvent now: - Stores event - Notifies all matching active subscriptions - Streams to gRPC clients in real-time 4 new tests, all 41 tests passing --- internal/subscription/manager_test.go | 227 ++++++++++++++++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 internal/subscription/manager_test.go (limited to 'internal/subscription/manager_test.go') diff --git a/internal/subscription/manager_test.go b/internal/subscription/manager_test.go new file mode 100644 index 0000000..d816fcd --- /dev/null +++ b/internal/subscription/manager_test.go @@ -0,0 +1,227 @@ +package subscription + +import ( + "testing" + "time" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +func TestManagerAddRemove(t *testing.T) { + mgr := NewManager() + + sub := &Subscription{ + ID: "sub1", + Filters: []*pb.Filter{}, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + mgr.Add(sub) + + retrieved, ok := mgr.Get("sub1") + if !ok { + t.Fatal("expected to find subscription") + } + if retrieved.ID != "sub1" { + t.Errorf("expected ID sub1, got %s", retrieved.ID) + } + + mgr.Remove("sub1") + + _, ok = mgr.Get("sub1") + if ok { + t.Error("expected subscription to be removed") + } +} + +func TestMatchesFilter(t *testing.T) { + tests := []struct { + name string + event *pb.Event + filter *pb.Filter + matches bool + }{ + { + name: "matches kind", + event: &pb.Event{ + Id: "test1", + Pubkey: "pubkey1", + CreatedAt: 1000, + Kind: 1, + }, + filter: &pb.Filter{ + Kinds: []int32{1, 2}, + }, + matches: true, + }, + { + name: "does not match kind", + event: &pb.Event{ + Kind: 3, + }, + filter: &pb.Filter{ + Kinds: []int32{1, 2}, + }, + matches: false, + }, + { + name: "matches author prefix", + event: &pb.Event{ + Pubkey: "abcdef123456", + }, + filter: &pb.Filter{ + Authors: []string{"abcd"}, + }, + matches: true, + }, + { + name: "matches author exact", + event: &pb.Event{ + Pubkey: "exact", + }, + filter: &pb.Filter{ + Authors: []string{"exact"}, + }, + matches: true, + }, + { + name: "does not match author", + event: &pb.Event{ + Pubkey: "different", + }, + filter: &pb.Filter{ + Authors: []string{"other"}, + }, + matches: false, + }, + { + name: "matches since", + event: &pb.Event{ + CreatedAt: 2000, + }, + filter: &pb.Filter{ + Since: ptr[int64](1000), + }, + matches: true, + }, + { + name: "does not match since", + event: &pb.Event{ + CreatedAt: 500, + }, + filter: &pb.Filter{ + Since: ptr[int64](1000), + }, + matches: false, + }, + { + name: "matches e tag", + event: &pb.Event{ + Tags: []*pb.Tag{ + {Values: []string{"e", "event123"}}, + }, + }, + filter: &pb.Filter{ + ETags: []string{"event123"}, + }, + matches: true, + }, + { + name: "does not match e tag", + event: &pb.Event{ + Tags: []*pb.Tag{ + {Values: []string{"e", "event456"}}, + }, + }, + filter: &pb.Filter{ + ETags: []string{"event123"}, + }, + matches: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matchesFilter(tt.event, tt.filter) + if result != tt.matches { + t.Errorf("expected %v, got %v", tt.matches, result) + } + }) + } +} + +func TestMatchAndFan(t *testing.T) { + mgr := NewManager() + + sub1 := &Subscription{ + ID: "sub1", + Filters: []*pb.Filter{ + {Kinds: []int32{1}}, + }, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + sub2 := &Subscription{ + ID: "sub2", + Filters: []*pb.Filter{ + {Kinds: []int32{2}}, + }, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + mgr.Add(sub1) + mgr.Add(sub2) + + event := &pb.Event{ + Id: "test1", + Pubkey: "pubkey1", + CreatedAt: 1000, + Kind: 1, + Content: "test", + } + + mgr.MatchAndFan(event) + + select { + case e := <-sub1.Events: + if e.Id != "test1" { + t.Errorf("expected event test1, got %s", e.Id) + } + case <-time.After(100 * time.Millisecond): + t.Error("expected event on sub1") + } + + select { + case <-sub2.Events: + t.Error("did not expect event on sub2") + case <-time.After(50 * time.Millisecond): + } +} + +func TestSubscriptionClose(t *testing.T) { + sub := &Subscription{ + ID: "test", + Filters: []*pb.Filter{}, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + if sub.IsClosed() { + t.Error("subscription should not be closed initially") + } + + sub.Close() + + if !sub.IsClosed() { + t.Error("subscription should be closed") + } + + sub.Close() +} + +func ptr[T any](v T) *T { + return &v +} -- cgit v1.2.3