From 28d6d0ea2f86d69ad003557656466a50545fc0c9 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:58:29 -0800 Subject: feat: implement Subscribe with real-time event streaming Subscription manager: - Track active subscriptions across connections - Filter matching with full NIP-01 support - Event fan-out to matching subscribers Subscribe RPC: - Query stored events (past) - Stream them to client - Keep stream open for new events (real-time) - Auto-generate subscription ID if not provided PublishEvent now: - Stores event - Notifies all matching active subscriptions - Streams to gRPC clients in real-time 4 new tests, all 41 tests passing --- internal/handler/grpc/server.go | 74 ++++++++++- internal/subscription/manager.go | 190 ++++++++++++++++++++++++++++ internal/subscription/manager_test.go | 227 ++++++++++++++++++++++++++++++++++ 3 files changed, 488 insertions(+), 3 deletions(-) create mode 100644 internal/subscription/manager.go create mode 100644 internal/subscription/manager_test.go (limited to 'internal') diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go index a3a3175..74857f6 100644 --- a/internal/handler/grpc/server.go +++ b/internal/handler/grpc/server.go @@ -2,10 +2,12 @@ package grpc import ( "context" + "crypto/rand" "fmt" pb "northwest.io/nostr-grpc/api/nostr/v1" "northwest.io/nostr-grpc/internal/storage" + "northwest.io/nostr-grpc/internal/subscription" ) type EventStore interface { @@ -16,10 +18,14 @@ type EventStore interface { type Server struct { pb.UnimplementedNostrRelayServer store EventStore + subs *subscription.Manager } func NewServer(store EventStore) *Server { - return &Server{store: store} + return &Server{ + store: store, + subs: subscription.NewManager(), + } } func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { @@ -65,6 +71,8 @@ func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) return nil, fmt.Errorf("failed to store event: %w", err) } + s.subs.MatchAndFan(req.Event) + return &pb.PublishEventResponse{ Accepted: true, Message: "success", @@ -120,9 +128,69 @@ func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest) } func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { - return fmt.Errorf("not implemented yet") + subID := req.SubscriptionId + if subID == "" { + subID = generateSubID() + } + + sub := &subscription.Subscription{ + ID: subID, + Filters: req.Filters, + Events: make(chan *pb.Event, 100), + } + sub.InitDone() + + s.subs.Add(sub) + defer s.subs.Remove(subID) + + opts := &storage.QueryOptions{ + IncludeCanonical: req.IncludeCanonicalJson, + Limit: 0, + } + + storedEvents, err := s.store.QueryEvents(stream.Context(), req.Filters, opts) + if err != nil { + return fmt.Errorf("query failed: %w", err) + } + + for _, event := range storedEvents { + if err := stream.Send(event); err != nil { + return err + } + } + + for { + select { + case event, ok := <-sub.Events: + if !ok { + return nil + } + + eventToSend := event + if req.IncludeCanonicalJson && event.CanonicalJson == nil { + return fmt.Errorf("canonical JSON requested but not available") + } + + if err := stream.Send(eventToSend); err != nil { + return err + } + + case <-stream.Context().Done(): + return stream.Context().Err() + + case <-sub.Done(): + return nil + } + } } func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { - return nil, fmt.Errorf("not implemented yet") + s.subs.Remove(req.SubscriptionId) + return &pb.Empty{}, nil +} + +func generateSubID() string { + b := make([]byte, 8) + rand.Read(b) + return fmt.Sprintf("%x", b) } diff --git a/internal/subscription/manager.go b/internal/subscription/manager.go new file mode 100644 index 0000000..0e737d8 --- /dev/null +++ b/internal/subscription/manager.go @@ -0,0 +1,190 @@ +package subscription + +import ( + "sync" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +type Subscription struct { + ID string + Filters []*pb.Filter + Events chan *pb.Event + done chan struct{} + once sync.Once +} + +func (s *Subscription) InitDone() { + s.done = make(chan struct{}) +} + +func (s *Subscription) Done() <-chan struct{} { + return s.done +} + +func (s *Subscription) Close() { + s.once.Do(func() { + close(s.done) + close(s.Events) + }) +} + +func (s *Subscription) IsClosed() bool { + select { + case <-s.done: + return true + default: + return false + } +} + +type Manager struct { + mu sync.RWMutex + subscriptions map[string]*Subscription +} + +func NewManager() *Manager { + return &Manager{ + subscriptions: make(map[string]*Subscription), + } +} + +func (m *Manager) Add(sub *Subscription) { + m.mu.Lock() + defer m.mu.Unlock() + m.subscriptions[sub.ID] = sub +} + +func (m *Manager) Remove(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if sub, ok := m.subscriptions[id]; ok { + sub.Close() + delete(m.subscriptions, id) + } +} + +func (m *Manager) Get(id string) (*Subscription, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + sub, ok := m.subscriptions[id] + return sub, ok +} + +func (m *Manager) MatchAndFan(event *pb.Event) { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, sub := range m.subscriptions { + if sub.IsClosed() { + continue + } + + if matchesAnyFilter(event, sub.Filters) { + select { + case sub.Events <- event: + case <-sub.done: + default: + } + } + } +} + +func matchesAnyFilter(event *pb.Event, filters []*pb.Filter) bool { + for _, filter := range filters { + if matchesFilter(event, filter) { + return true + } + } + return false +} + +func matchesFilter(event *pb.Event, filter *pb.Filter) bool { + if len(filter.Ids) > 0 { + if !matchesPrefix(event.Id, filter.Ids) { + return false + } + } + + if len(filter.Authors) > 0 { + if !matchesPrefix(event.Pubkey, filter.Authors) { + return false + } + } + + if len(filter.Kinds) > 0 { + found := false + for _, kind := range filter.Kinds { + if event.Kind == kind { + found = true + break + } + } + if !found { + return false + } + } + + if filter.Since != nil && *filter.Since > 0 { + if event.CreatedAt < *filter.Since { + return false + } + } + + if filter.Until != nil && *filter.Until > 0 { + if event.CreatedAt > *filter.Until { + return false + } + } + + if len(filter.ETags) > 0 { + if !hasTag(event, "e", filter.ETags) { + return false + } + } + + if len(filter.PTags) > 0 { + if !hasTag(event, "p", filter.PTags) { + return false + } + } + + for tagName, tagFilter := range filter.TagFilters { + if len(tagFilter.Values) > 0 { + if !hasTag(event, tagName, tagFilter.Values) { + return false + } + } + } + + return true +} + +func matchesPrefix(value string, prefixes []string) bool { + for _, prefix := range prefixes { + if len(prefix) == len(value) { + if value == prefix { + return true + } + } else if len(value) > len(prefix) { + if value[:len(prefix)] == prefix { + return true + } + } + } + return false +} + +func hasTag(event *pb.Event, tagName string, values []string) bool { + for _, tag := range event.Tags { + if len(tag.Values) >= 2 && tag.Values[0] == tagName { + for _, val := range values { + if tag.Values[1] == val { + return true + } + } + } + } + return false +} diff --git a/internal/subscription/manager_test.go b/internal/subscription/manager_test.go new file mode 100644 index 0000000..d816fcd --- /dev/null +++ b/internal/subscription/manager_test.go @@ -0,0 +1,227 @@ +package subscription + +import ( + "testing" + "time" + + pb "northwest.io/nostr-grpc/api/nostr/v1" +) + +func TestManagerAddRemove(t *testing.T) { + mgr := NewManager() + + sub := &Subscription{ + ID: "sub1", + Filters: []*pb.Filter{}, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + mgr.Add(sub) + + retrieved, ok := mgr.Get("sub1") + if !ok { + t.Fatal("expected to find subscription") + } + if retrieved.ID != "sub1" { + t.Errorf("expected ID sub1, got %s", retrieved.ID) + } + + mgr.Remove("sub1") + + _, ok = mgr.Get("sub1") + if ok { + t.Error("expected subscription to be removed") + } +} + +func TestMatchesFilter(t *testing.T) { + tests := []struct { + name string + event *pb.Event + filter *pb.Filter + matches bool + }{ + { + name: "matches kind", + event: &pb.Event{ + Id: "test1", + Pubkey: "pubkey1", + CreatedAt: 1000, + Kind: 1, + }, + filter: &pb.Filter{ + Kinds: []int32{1, 2}, + }, + matches: true, + }, + { + name: "does not match kind", + event: &pb.Event{ + Kind: 3, + }, + filter: &pb.Filter{ + Kinds: []int32{1, 2}, + }, + matches: false, + }, + { + name: "matches author prefix", + event: &pb.Event{ + Pubkey: "abcdef123456", + }, + filter: &pb.Filter{ + Authors: []string{"abcd"}, + }, + matches: true, + }, + { + name: "matches author exact", + event: &pb.Event{ + Pubkey: "exact", + }, + filter: &pb.Filter{ + Authors: []string{"exact"}, + }, + matches: true, + }, + { + name: "does not match author", + event: &pb.Event{ + Pubkey: "different", + }, + filter: &pb.Filter{ + Authors: []string{"other"}, + }, + matches: false, + }, + { + name: "matches since", + event: &pb.Event{ + CreatedAt: 2000, + }, + filter: &pb.Filter{ + Since: ptr[int64](1000), + }, + matches: true, + }, + { + name: "does not match since", + event: &pb.Event{ + CreatedAt: 500, + }, + filter: &pb.Filter{ + Since: ptr[int64](1000), + }, + matches: false, + }, + { + name: "matches e tag", + event: &pb.Event{ + Tags: []*pb.Tag{ + {Values: []string{"e", "event123"}}, + }, + }, + filter: &pb.Filter{ + ETags: []string{"event123"}, + }, + matches: true, + }, + { + name: "does not match e tag", + event: &pb.Event{ + Tags: []*pb.Tag{ + {Values: []string{"e", "event456"}}, + }, + }, + filter: &pb.Filter{ + ETags: []string{"event123"}, + }, + matches: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matchesFilter(tt.event, tt.filter) + if result != tt.matches { + t.Errorf("expected %v, got %v", tt.matches, result) + } + }) + } +} + +func TestMatchAndFan(t *testing.T) { + mgr := NewManager() + + sub1 := &Subscription{ + ID: "sub1", + Filters: []*pb.Filter{ + {Kinds: []int32{1}}, + }, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + sub2 := &Subscription{ + ID: "sub2", + Filters: []*pb.Filter{ + {Kinds: []int32{2}}, + }, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + mgr.Add(sub1) + mgr.Add(sub2) + + event := &pb.Event{ + Id: "test1", + Pubkey: "pubkey1", + CreatedAt: 1000, + Kind: 1, + Content: "test", + } + + mgr.MatchAndFan(event) + + select { + case e := <-sub1.Events: + if e.Id != "test1" { + t.Errorf("expected event test1, got %s", e.Id) + } + case <-time.After(100 * time.Millisecond): + t.Error("expected event on sub1") + } + + select { + case <-sub2.Events: + t.Error("did not expect event on sub2") + case <-time.After(50 * time.Millisecond): + } +} + +func TestSubscriptionClose(t *testing.T) { + sub := &Subscription{ + ID: "test", + Filters: []*pb.Filter{}, + Events: make(chan *pb.Event, 10), + done: make(chan struct{}), + } + + if sub.IsClosed() { + t.Error("subscription should not be closed initially") + } + + sub.Close() + + if !sub.IsClosed() { + t.Error("subscription should be closed") + } + + sub.Close() +} + +func ptr[T any](v T) *T { + return &v +} -- cgit v1.2.3