From 28d6d0ea2f86d69ad003557656466a50545fc0c9 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:58:29 -0800 Subject: feat: implement Subscribe with real-time event streaming Subscription manager: - Track active subscriptions across connections - Filter matching with full NIP-01 support - Event fan-out to matching subscribers Subscribe RPC: - Query stored events (past) - Stream them to client - Keep stream open for new events (real-time) - Auto-generate subscription ID if not provided PublishEvent now: - Stores event - Notifies all matching active subscriptions - Streams to gRPC clients in real-time 4 new tests, all 41 tests passing --- internal/handler/grpc/server.go | 74 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 3 deletions(-) (limited to 'internal/handler/grpc/server.go') diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go index a3a3175..74857f6 100644 --- a/internal/handler/grpc/server.go +++ b/internal/handler/grpc/server.go @@ -2,10 +2,12 @@ package grpc import ( "context" + "crypto/rand" "fmt" pb "northwest.io/nostr-grpc/api/nostr/v1" "northwest.io/nostr-grpc/internal/storage" + "northwest.io/nostr-grpc/internal/subscription" ) type EventStore interface { @@ -16,10 +18,14 @@ type EventStore interface { type Server struct { pb.UnimplementedNostrRelayServer store EventStore + subs *subscription.Manager } func NewServer(store EventStore) *Server { - return &Server{store: store} + return &Server{ + store: store, + subs: subscription.NewManager(), + } } func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { @@ -65,6 +71,8 @@ func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) return nil, fmt.Errorf("failed to store event: %w", err) } + s.subs.MatchAndFan(req.Event) + return &pb.PublishEventResponse{ Accepted: true, Message: "success", @@ -120,9 +128,69 @@ func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest) } func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { - return fmt.Errorf("not implemented yet") + subID := req.SubscriptionId + if subID == "" { + subID = generateSubID() + } + + sub := &subscription.Subscription{ + ID: subID, + Filters: req.Filters, + Events: make(chan *pb.Event, 100), + } + sub.InitDone() + + s.subs.Add(sub) + defer s.subs.Remove(subID) + + opts := &storage.QueryOptions{ + IncludeCanonical: req.IncludeCanonicalJson, + Limit: 0, + } + + storedEvents, err := s.store.QueryEvents(stream.Context(), req.Filters, opts) + if err != nil { + return fmt.Errorf("query failed: %w", err) + } + + for _, event := range storedEvents { + if err := stream.Send(event); err != nil { + return err + } + } + + for { + select { + case event, ok := <-sub.Events: + if !ok { + return nil + } + + eventToSend := event + if req.IncludeCanonicalJson && event.CanonicalJson == nil { + return fmt.Errorf("canonical JSON requested but not available") + } + + if err := stream.Send(eventToSend); err != nil { + return err + } + + case <-stream.Context().Done(): + return stream.Context().Err() + + case <-sub.Done(): + return nil + } + } } func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { - return nil, fmt.Errorf("not implemented yet") + s.subs.Remove(req.SubscriptionId) + return &pb.Empty{}, nil +} + +func generateSubID() string { + b := make([]byte, 8) + rand.Read(b) + return fmt.Sprintf("%x", b) } -- cgit v1.2.3