From 28d6d0ea2f86d69ad003557656466a50545fc0c9 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:58:29 -0800 Subject: feat: implement Subscribe with real-time event streaming Subscription manager: - Track active subscriptions across connections - Filter matching with full NIP-01 support - Event fan-out to matching subscribers Subscribe RPC: - Query stored events (past) - Stream them to client - Keep stream open for new events (real-time) - Auto-generate subscription ID if not provided PublishEvent now: - Stores event - Notifies all matching active subscriptions - Streams to gRPC clients in real-time 4 new tests, all 41 tests passing --- internal/subscription/manager.go | 190 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 internal/subscription/manager.go (limited to 'internal/subscription/manager.go') diff --git a/internal/subscription/manager.go b/internal/subscription/manager.go new file mode 100644 index 0000000..0e737d8 --- /dev/null +++ b/internal/subscription/manager.go @@ -0,0 +1,190 @@ +package subscription + +import ( + "sync" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +type Subscription struct { + ID string + Filters []*pb.Filter + Events chan *pb.Event + done chan struct{} + once sync.Once +} + +func (s *Subscription) InitDone() { + s.done = make(chan struct{}) +} + +func (s *Subscription) Done() <-chan struct{} { + return s.done +} + +func (s *Subscription) Close() { + s.once.Do(func() { + close(s.done) + close(s.Events) + }) +} + +func (s *Subscription) IsClosed() bool { + select { + case <-s.done: + return true + default: + return false + } +} + +type Manager struct { + mu sync.RWMutex + subscriptions map[string]*Subscription +} + +func NewManager() *Manager { + return &Manager{ + subscriptions: make(map[string]*Subscription), + } +} + +func (m *Manager) Add(sub *Subscription) { + m.mu.Lock() + defer m.mu.Unlock() + m.subscriptions[sub.ID] = sub +} + +func (m *Manager) Remove(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if sub, ok := m.subscriptions[id]; ok { + sub.Close() + delete(m.subscriptions, id) + } +} + +func (m *Manager) Get(id string) (*Subscription, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + sub, ok := m.subscriptions[id] + return sub, ok +} + +func (m *Manager) MatchAndFan(event *pb.Event) { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, sub := range m.subscriptions { + if sub.IsClosed() { + continue + } + + if matchesAnyFilter(event, sub.Filters) { + select { + case sub.Events <- event: + case <-sub.done: + default: + } + } + } +} + +func matchesAnyFilter(event *pb.Event, filters []*pb.Filter) bool { + for _, filter := range filters { + if matchesFilter(event, filter) { + return true + } + } + return false +} + +func matchesFilter(event *pb.Event, filter *pb.Filter) bool { + if len(filter.Ids) > 0 { + if !matchesPrefix(event.Id, filter.Ids) { + return false + } + } + + if len(filter.Authors) > 0 { + if !matchesPrefix(event.Pubkey, filter.Authors) { + return false + } + } + + if len(filter.Kinds) > 0 { + found := false + for _, kind := range filter.Kinds { + if event.Kind == kind { + found = true + break + } + } + if !found { + return false + } + } + + if filter.Since != nil && *filter.Since > 0 { + if event.CreatedAt < *filter.Since { + return false + } + } + + if filter.Until != nil && *filter.Until > 0 { + if event.CreatedAt > *filter.Until { + return false + } + } + + if len(filter.ETags) > 0 { + if !hasTag(event, "e", filter.ETags) { + return false + } + } + + if len(filter.PTags) > 0 { + if !hasTag(event, "p", filter.PTags) { + return false + } + } + + for tagName, tagFilter := range filter.TagFilters { + if len(tagFilter.Values) > 0 { + if !hasTag(event, tagName, tagFilter.Values) { + return false + } + } + } + + return true +} + +func matchesPrefix(value string, prefixes []string) bool { + for _, prefix := range prefixes { + if len(prefix) == len(value) { + if value == prefix { + return true + } + } else if len(value) > len(prefix) { + if value[:len(prefix)] == prefix { + return true + } + } + } + return false +} + +func hasTag(event *pb.Event, tagName string, values []string) bool { + for _, tag := range event.Tags { + if len(tag.Values) >= 2 && tag.Values[0] == tagName { + for _, val := range values { + if tag.Values[1] == val { + return true + } + } + } + } + return false +} -- cgit v1.2.3