From 28d6d0ea2f86d69ad003557656466a50545fc0c9 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:58:29 -0800 Subject: feat: implement Subscribe with real-time event streaming Subscription manager: - Track active subscriptions across connections - Filter matching with full NIP-01 support - Event fan-out to matching subscribers Subscribe RPC: - Query stored events (past) - Stream them to client - Keep stream open for new events (real-time) - Auto-generate subscription ID if not provided PublishEvent now: - Stores event - Notifies all matching active subscriptions - Streams to gRPC clients in real-time 4 new tests, all 41 tests passing --- internal/subscription/manager.go | 190 ++++++++++++++++++++++++++++ internal/subscription/manager_test.go | 227 ++++++++++++++++++++++++++++++++++ 2 files changed, 417 insertions(+) create mode 100644 internal/subscription/manager.go create mode 100644 internal/subscription/manager_test.go (limited to 'internal/subscription') diff --git a/internal/subscription/manager.go b/internal/subscription/manager.go new file mode 100644 index 0000000..0e737d8 --- /dev/null +++ b/internal/subscription/manager.go @@ -0,0 +1,190 @@ +package subscription + +import ( + "sync" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +type Subscription struct { + ID string + Filters []*pb.Filter + Events chan *pb.Event + done chan struct{} + once sync.Once +} + +func (s *Subscription) InitDone() { + s.done = make(chan struct{}) +} + +func (s *Subscription) Done() <-chan struct{} { + return s.done +} + +func (s *Subscription) Close() { + s.once.Do(func() { + close(s.done) + close(s.Events) + }) +} + +func (s *Subscription) IsClosed() bool { + select { + case <-s.done: + return true + default: + return false + } +} + +type Manager struct { + mu sync.RWMutex + subscriptions map[string]*Subscription +} + +func NewManager() *Manager { + return &Manager{ + subscriptions: make(map[string]*Subscription), + } +} + +func (m *Manager) Add(sub *Subscription) { + m.mu.Lock() + defer m.mu.Unlock() + m.subscriptions[sub.ID] = sub +} + +func (m *Manager) Remove(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if sub, ok := m.subscriptions[id]; ok { + sub.Close() + delete(m.subscriptions, id) + } +} + +func (m *Manager) Get(id string) (*Subscription, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + sub, ok := m.subscriptions[id] + return sub, ok +} + +func (m *Manager) MatchAndFan(event *pb.Event) { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, sub := range m.subscriptions { + if sub.IsClosed() { + continue + } + + if matchesAnyFilter(event, sub.Filters) { + select { + case sub.Events <- event: + case <-sub.done: + default: + } + } + } +} + +func matchesAnyFilter(event *pb.Event, filters []*pb.Filter) bool { + for _, filter := range filters { + if matchesFilter(event, filter) { + return true + } + } + return false +} + +func matchesFilter(event *pb.Event, filter *pb.Filter) bool { + if len(filter.Ids) > 0 { + if !matchesPrefix(event.Id, filter.Ids) { + return false + } + } + + if len(filter.Authors) > 0 { + if !matchesPrefix(event.Pubkey, filter.Authors) { + return false + } + } + + if len(filter.Kinds) > 0 { + found := false + for _, kind := range filter.Kinds { + if event.Kind == kind { + found = true + break + } + } + if !found { + return false + } + } + + if filter.Since != nil && *filter.Since > 0 { + if event.CreatedAt < *filter.Since { + return false + } + } + + if filter.Until != nil && *filter.Until > 0 { + if event.CreatedAt > *filter.Until { + return false + } + } + + if len(filter.ETags) > 0 { + if !hasTag(event, "e", filter.ETags) { + return false + } + } + + if len(filter.PTags) > 0 { + if !hasTag(event, "p", filter.PTags) { + return false + } + } + + for tagName, tagFilter := range filter.TagFilters { + if len(tagFilter.Values) > 0 { + if !hasTag(event, tagName, tagFilter.Values) { + return false + } + } + } + + return true +} + +func matchesPrefix(value string, prefixes []string) bool { + for _, prefix := range prefixes { + if len(prefix) == len(value) { + if value == prefix { + return true + } + } else if len(value) > len(prefix) { + if value[:len(prefix)] == prefix { + return true + } + } + } + return false +} + +func hasTag(event *pb.Event, tagName string, values []string) bool { + for _, tag := range event.Tags { + if len(tag.Values) >= 2 && tag.Values[0] == tagName { + for _, val := range values { + if tag.Values[1] == val { + return true + } + } + } + } + return false +} diff --git a/internal/subscription/manager_test.go b/internal/subscription/manager_test.go new file mode 100644 index 0000000..d816fcd --- /dev/null +++ b/internal/subscription/manager_test.go @@ -0,0 +1,227 @@ +package subscription + +import ( + "testing" + "time" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +func TestManagerAddRemove(t *testing.T) { + mgr := NewManager() + + sub := &Subscription{ + ID: "sub1", + Filters: []*pb.Filter{}, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + mgr.Add(sub) + + retrieved, ok := mgr.Get("sub1") + if !ok { + t.Fatal("expected to find subscription") + } + if retrieved.ID != "sub1" { + t.Errorf("expected ID sub1, got %s", retrieved.ID) + } + + mgr.Remove("sub1") + + _, ok = mgr.Get("sub1") + if ok { + t.Error("expected subscription to be removed") + } +} + +func TestMatchesFilter(t *testing.T) { + tests := []struct { + name string + event *pb.Event + filter *pb.Filter + matches bool + }{ + { + name: "matches kind", + event: &pb.Event{ + Id: "test1", + Pubkey: "pubkey1", + CreatedAt: 1000, + Kind: 1, + }, + filter: &pb.Filter{ + Kinds: []int32{1, 2}, + }, + matches: true, + }, + { + name: "does not match kind", + event: &pb.Event{ + Kind: 3, + }, + filter: &pb.Filter{ + Kinds: []int32{1, 2}, + }, + matches: false, + }, + { + name: "matches author prefix", + event: &pb.Event{ + Pubkey: "abcdef123456", + }, + filter: &pb.Filter{ + Authors: []string{"abcd"}, + }, + matches: true, + }, + { + name: "matches author exact", + event: &pb.Event{ + Pubkey: "exact", + }, + filter: &pb.Filter{ + Authors: []string{"exact"}, + }, + matches: true, + }, + { + name: "does not match author", + event: &pb.Event{ + Pubkey: "different", + }, + filter: &pb.Filter{ + Authors: []string{"other"}, + }, + matches: false, + }, + { + name: "matches since", + event: &pb.Event{ + CreatedAt: 2000, + }, + filter: &pb.Filter{ + Since: ptr[int64](1000), + }, + matches: true, + }, + { + name: "does not match since", + event: &pb.Event{ + CreatedAt: 500, + }, + filter: &pb.Filter{ + Since: ptr[int64](1000), + }, + matches: false, + }, + { + name: "matches e tag", + event: &pb.Event{ + Tags: []*pb.Tag{ + {Values: []string{"e", "event123"}}, + }, + }, + filter: &pb.Filter{ + ETags: []string{"event123"}, + }, + matches: true, + }, + { + name: "does not match e tag", + event: &pb.Event{ + Tags: []*pb.Tag{ + {Values: []string{"e", "event456"}}, + }, + }, + filter: &pb.Filter{ + ETags: []string{"event123"}, + }, + matches: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matchesFilter(tt.event, tt.filter) + if result != tt.matches { + t.Errorf("expected %v, got %v", tt.matches, result) + } + }) + } +} + +func TestMatchAndFan(t *testing.T) { + mgr := NewManager() + + sub1 := &Subscription{ + ID: "sub1", + Filters: []*pb.Filter{ + {Kinds: []int32{1}}, + }, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + sub2 := &Subscription{ + ID: "sub2", + Filters: []*pb.Filter{ + {Kinds: []int32{2}}, + }, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + mgr.Add(sub1) + mgr.Add(sub2) + + event := &pb.Event{ + Id: "test1", + Pubkey: "pubkey1", + CreatedAt: 1000, + Kind: 1, + Content: "test", + } + + mgr.MatchAndFan(event) + + select { + case e := <-sub1.Events: + if e.Id != "test1" { + t.Errorf("expected event test1, got %s", e.Id) + } + case <-time.After(100 * time.Millisecond): + t.Error("expected event on sub1") + } + + select { + case <-sub2.Events: + t.Error("did not expect event on sub2") + case <-time.After(50 * time.Millisecond): + } +} + +func TestSubscriptionClose(t *testing.T) { + sub := &Subscription{ + ID: "test", + Filters: []*pb.Filter{}, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + if sub.IsClosed() { + t.Error("subscription should not be closed initially") + } + + sub.Close() + + if !sub.IsClosed() { + t.Error("subscription should be closed") + } + + sub.Close() +} + +func ptr[T any](v T) *T { + return &v +} -- cgit v1.2.3