summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-02-13 17:58:29 -0800
committerbndw <ben@bdw.to>2026-02-13 17:58:29 -0800
commit28d6d0ea2f86d69ad003557656466a50545fc0c9 (patch)
tree7fba1ecd6d1733cd5fbdec12d9e45318613a0249
parentfcba12d7ae3cdb361c6321519fdaf5a537a6a871 (diff)
feat: implement Subscribe with real-time event streaming
Subscription manager: - Track active subscriptions across connections - Filter matching with full NIP-01 support - Event fan-out to matching subscribers Subscribe RPC: - Query stored events (past) - Stream them to client - Keep stream open for new events (real-time) - Auto-generate subscription ID if not provided PublishEvent now: - Stores event - Notifies all matching active subscriptions - Streams to gRPC clients in real-time 4 new tests, all 41 tests passing
-rw-r--r--internal/handler/grpc/server.go74
-rw-r--r--internal/subscription/manager.go190
-rw-r--r--internal/subscription/manager_test.go227
3 files changed, 488 insertions, 3 deletions
diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go
index a3a3175..74857f6 100644
--- a/internal/handler/grpc/server.go
+++ b/internal/handler/grpc/server.go
@@ -2,10 +2,12 @@ package grpc
2 2
3import ( 3import (
4 "context" 4 "context"
5 "crypto/rand"
5 "fmt" 6 "fmt"
6 7
7 pb "northwest.io/nostr-grpc/api/nostr/v1" 8 pb "northwest.io/nostr-grpc/api/nostr/v1"
8 "northwest.io/nostr-grpc/internal/storage" 9 "northwest.io/nostr-grpc/internal/storage"
10 "northwest.io/nostr-grpc/internal/subscription"
9) 11)
10 12
11type EventStore interface { 13type EventStore interface {
@@ -16,10 +18,14 @@ type EventStore interface {
16type Server struct { 18type Server struct {
17 pb.UnimplementedNostrRelayServer 19 pb.UnimplementedNostrRelayServer
18 store EventStore 20 store EventStore
21 subs *subscription.Manager
19} 22}
20 23
21func NewServer(store EventStore) *Server { 24func NewServer(store EventStore) *Server {
22 return &Server{store: store} 25 return &Server{
26 store: store,
27 subs: subscription.NewManager(),
28 }
23} 29}
24 30
25func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { 31func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) {
@@ -65,6 +71,8 @@ func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest)
65 return nil, fmt.Errorf("failed to store event: %w", err) 71 return nil, fmt.Errorf("failed to store event: %w", err)
66 } 72 }
67 73
74 s.subs.MatchAndFan(req.Event)
75
68 return &pb.PublishEventResponse{ 76 return &pb.PublishEventResponse{
69 Accepted: true, 77 Accepted: true,
70 Message: "success", 78 Message: "success",
@@ -120,9 +128,69 @@ func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest)
120} 128}
121 129
122func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { 130func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error {
123 return fmt.Errorf("not implemented yet") 131 subID := req.SubscriptionId
132 if subID == "" {
133 subID = generateSubID()
134 }
135
136 sub := &subscription.Subscription{
137 ID: subID,
138 Filters: req.Filters,
139 Events: make(chan *pb.Event, 100),
140 }
141 sub.InitDone()
142
143 s.subs.Add(sub)
144 defer s.subs.Remove(subID)
145
146 opts := &storage.QueryOptions{
147 IncludeCanonical: req.IncludeCanonicalJson,
148 Limit: 0,
149 }
150
151 storedEvents, err := s.store.QueryEvents(stream.Context(), req.Filters, opts)
152 if err != nil {
153 return fmt.Errorf("query failed: %w", err)
154 }
155
156 for _, event := range storedEvents {
157 if err := stream.Send(event); err != nil {
158 return err
159 }
160 }
161
162 for {
163 select {
164 case event, ok := <-sub.Events:
165 if !ok {
166 return nil
167 }
168
169 eventToSend := event
170 if req.IncludeCanonicalJson && event.CanonicalJson == nil {
171 return fmt.Errorf("canonical JSON requested but not available")
172 }
173
174 if err := stream.Send(eventToSend); err != nil {
175 return err
176 }
177
178 case <-stream.Context().Done():
179 return stream.Context().Err()
180
181 case <-sub.Done():
182 return nil
183 }
184 }
124} 185}
125 186
126func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { 187func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) {
127 return nil, fmt.Errorf("not implemented yet") 188 s.subs.Remove(req.SubscriptionId)
189 return &pb.Empty{}, nil
190}
191
192func generateSubID() string {
193 b := make([]byte, 8)
194 rand.Read(b)
195 return fmt.Sprintf("%x", b)
128} 196}
diff --git a/internal/subscription/manager.go b/internal/subscription/manager.go
new file mode 100644
index 0000000..0e737d8
--- /dev/null
+++ b/internal/subscription/manager.go
@@ -0,0 +1,190 @@
1package subscription
2
3import (
4 "sync"
5
6 pb "northwest.io/nostr-grpc/api/nostr/v1"
7)
8
9type Subscription struct {
10 ID string
11 Filters []*pb.Filter
12 Events chan *pb.Event
13 done chan struct{}
14 once sync.Once
15}
16
17func (s *Subscription) InitDone() {
18 s.done = make(chan struct{})
19}
20
21func (s *Subscription) Done() <-chan struct{} {
22 return s.done
23}
24
25func (s *Subscription) Close() {
26 s.once.Do(func() {
27 close(s.done)
28 close(s.Events)
29 })
30}
31
32func (s *Subscription) IsClosed() bool {
33 select {
34 case <-s.done:
35 return true
36 default:
37 return false
38 }
39}
40
41type Manager struct {
42 mu sync.RWMutex
43 subscriptions map[string]*Subscription
44}
45
46func NewManager() *Manager {
47 return &Manager{
48 subscriptions: make(map[string]*Subscription),
49 }
50}
51
52func (m *Manager) Add(sub *Subscription) {
53 m.mu.Lock()
54 defer m.mu.Unlock()
55 m.subscriptions[sub.ID] = sub
56}
57
58func (m *Manager) Remove(id string) {
59 m.mu.Lock()
60 defer m.mu.Unlock()
61
62 if sub, ok := m.subscriptions[id]; ok {
63 sub.Close()
64 delete(m.subscriptions, id)
65 }
66}
67
68func (m *Manager) Get(id string) (*Subscription, bool) {
69 m.mu.RLock()
70 defer m.mu.RUnlock()
71 sub, ok := m.subscriptions[id]
72 return sub, ok
73}
74
75func (m *Manager) MatchAndFan(event *pb.Event) {
76 m.mu.RLock()
77 defer m.mu.RUnlock()
78
79 for _, sub := range m.subscriptions {
80 if sub.IsClosed() {
81 continue
82 }
83
84 if matchesAnyFilter(event, sub.Filters) {
85 select {
86 case sub.Events <- event:
87 case <-sub.done:
88 default:
89 }
90 }
91 }
92}
93
94func matchesAnyFilter(event *pb.Event, filters []*pb.Filter) bool {
95 for _, filter := range filters {
96 if matchesFilter(event, filter) {
97 return true
98 }
99 }
100 return false
101}
102
103func matchesFilter(event *pb.Event, filter *pb.Filter) bool {
104 if len(filter.Ids) > 0 {
105 if !matchesPrefix(event.Id, filter.Ids) {
106 return false
107 }
108 }
109
110 if len(filter.Authors) > 0 {
111 if !matchesPrefix(event.Pubkey, filter.Authors) {
112 return false
113 }
114 }
115
116 if len(filter.Kinds) > 0 {
117 found := false
118 for _, kind := range filter.Kinds {
119 if event.Kind == kind {
120 found = true
121 break
122 }
123 }
124 if !found {
125 return false
126 }
127 }
128
129 if filter.Since != nil && *filter.Since > 0 {
130 if event.CreatedAt < *filter.Since {
131 return false
132 }
133 }
134
135 if filter.Until != nil && *filter.Until > 0 {
136 if event.CreatedAt > *filter.Until {
137 return false
138 }
139 }
140
141 if len(filter.ETags) > 0 {
142 if !hasTag(event, "e", filter.ETags) {
143 return false
144 }
145 }
146
147 if len(filter.PTags) > 0 {
148 if !hasTag(event, "p", filter.PTags) {
149 return false
150 }
151 }
152
153 for tagName, tagFilter := range filter.TagFilters {
154 if len(tagFilter.Values) > 0 {
155 if !hasTag(event, tagName, tagFilter.Values) {
156 return false
157 }
158 }
159 }
160
161 return true
162}
163
164func matchesPrefix(value string, prefixes []string) bool {
165 for _, prefix := range prefixes {
166 if len(prefix) == len(value) {
167 if value == prefix {
168 return true
169 }
170 } else if len(value) > len(prefix) {
171 if value[:len(prefix)] == prefix {
172 return true
173 }
174 }
175 }
176 return false
177}
178
179func hasTag(event *pb.Event, tagName string, values []string) bool {
180 for _, tag := range event.Tags {
181 if len(tag.Values) >= 2 && tag.Values[0] == tagName {
182 for _, val := range values {
183 if tag.Values[1] == val {
184 return true
185 }
186 }
187 }
188 }
189 return false
190}
diff --git a/internal/subscription/manager_test.go b/internal/subscription/manager_test.go
new file mode 100644
index 0000000..d816fcd
--- /dev/null
+++ b/internal/subscription/manager_test.go
@@ -0,0 +1,227 @@
1package subscription
2
3import (
4 "testing"
5 "time"
6
7 pb "northwest.io/nostr-grpc/api/nostr/v1"
8)
9
10func TestManagerAddRemove(t *testing.T) {
11 mgr := NewManager()
12
13 sub := &Subscription{
14 ID: "sub1",
15 Filters: []*pb.Filter{},
16 Events: make(chan *pb.Event, 10),
17 done: make(chan struct{}),
18 }
19
20 mgr.Add(sub)
21
22 retrieved, ok := mgr.Get("sub1")
23 if !ok {
24 t.Fatal("expected to find subscription")
25 }
26 if retrieved.ID != "sub1" {
27 t.Errorf("expected ID sub1, got %s", retrieved.ID)
28 }
29
30 mgr.Remove("sub1")
31
32 _, ok = mgr.Get("sub1")
33 if ok {
34 t.Error("expected subscription to be removed")
35 }
36}
37
38func TestMatchesFilter(t *testing.T) {
39 tests := []struct {
40 name string
41 event *pb.Event
42 filter *pb.Filter
43 matches bool
44 }{
45 {
46 name: "matches kind",
47 event: &pb.Event{
48 Id: "test1",
49 Pubkey: "pubkey1",
50 CreatedAt: 1000,
51 Kind: 1,
52 },
53 filter: &pb.Filter{
54 Kinds: []int32{1, 2},
55 },
56 matches: true,
57 },
58 {
59 name: "does not match kind",
60 event: &pb.Event{
61 Kind: 3,
62 },
63 filter: &pb.Filter{
64 Kinds: []int32{1, 2},
65 },
66 matches: false,
67 },
68 {
69 name: "matches author prefix",
70 event: &pb.Event{
71 Pubkey: "abcdef123456",
72 },
73 filter: &pb.Filter{
74 Authors: []string{"abcd"},
75 },
76 matches: true,
77 },
78 {
79 name: "matches author exact",
80 event: &pb.Event{
81 Pubkey: "exact",
82 },
83 filter: &pb.Filter{
84 Authors: []string{"exact"},
85 },
86 matches: true,
87 },
88 {
89 name: "does not match author",
90 event: &pb.Event{
91 Pubkey: "different",
92 },
93 filter: &pb.Filter{
94 Authors: []string{"other"},
95 },
96 matches: false,
97 },
98 {
99 name: "matches since",
100 event: &pb.Event{
101 CreatedAt: 2000,
102 },
103 filter: &pb.Filter{
104 Since: ptr[int64](1000),
105 },
106 matches: true,
107 },
108 {
109 name: "does not match since",
110 event: &pb.Event{
111 CreatedAt: 500,
112 },
113 filter: &pb.Filter{
114 Since: ptr[int64](1000),
115 },
116 matches: false,
117 },
118 {
119 name: "matches e tag",
120 event: &pb.Event{
121 Tags: []*pb.Tag{
122 {Values: []string{"e", "event123"}},
123 },
124 },
125 filter: &pb.Filter{
126 ETags: []string{"event123"},
127 },
128 matches: true,
129 },
130 {
131 name: "does not match e tag",
132 event: &pb.Event{
133 Tags: []*pb.Tag{
134 {Values: []string{"e", "event456"}},
135 },
136 },
137 filter: &pb.Filter{
138 ETags: []string{"event123"},
139 },
140 matches: false,
141 },
142 }
143
144 for _, tt := range tests {
145 t.Run(tt.name, func(t *testing.T) {
146 result := matchesFilter(tt.event, tt.filter)
147 if result != tt.matches {
148 t.Errorf("expected %v, got %v", tt.matches, result)
149 }
150 })
151 }
152}
153
154func TestMatchAndFan(t *testing.T) {
155 mgr := NewManager()
156
157 sub1 := &Subscription{
158 ID: "sub1",
159 Filters: []*pb.Filter{
160 {Kinds: []int32{1}},
161 },
162 Events: make(chan *pb.Event, 10),
163 done: make(chan struct{}),
164 }
165
166 sub2 := &Subscription{
167 ID: "sub2",
168 Filters: []*pb.Filter{
169 {Kinds: []int32{2}},
170 },
171 Events: make(chan *pb.Event, 10),
172 done: make(chan struct{}),
173 }
174
175 mgr.Add(sub1)
176 mgr.Add(sub2)
177
178 event := &pb.Event{
179 Id: "test1",
180 Pubkey: "pubkey1",
181 CreatedAt: 1000,
182 Kind: 1,
183 Content: "test",
184 }
185
186 mgr.MatchAndFan(event)
187
188 select {
189 case e := <-sub1.Events:
190 if e.Id != "test1" {
191 t.Errorf("expected event test1, got %s", e.Id)
192 }
193 case <-time.After(100 * time.Millisecond):
194 t.Error("expected event on sub1")
195 }
196
197 select {
198 case <-sub2.Events:
199 t.Error("did not expect event on sub2")
200 case <-time.After(50 * time.Millisecond):
201 }
202}
203
204func TestSubscriptionClose(t *testing.T) {
205 sub := &Subscription{
206 ID: "test",
207 Filters: []*pb.Filter{},
208 Events: make(chan *pb.Event, 10),
209 done: make(chan struct{}),
210 }
211
212 if sub.IsClosed() {
213 t.Error("subscription should not be closed initially")
214 }
215
216 sub.Close()
217
218 if !sub.IsClosed() {
219 t.Error("subscription should be closed")
220 }
221
222 sub.Close()
223}
224
225func ptr[T any](v T) *T {
226 return &v
227}