summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-02-07 21:22:51 -0800
committerbndw <ben@bdw.to>2026-02-07 21:22:51 -0800
commite79f9ad89556000521b43ce5ff4eb59dd00768b0 (patch)
tree9f8750e3c5431faf43fa672cb78a733a85d9dcfe
parentd4fd2467d691a69a0ba75348086424b9fb33a297 (diff)
refactor: race-safe Subscribe/Fetch API with channel-based Publish
- Add mutex-guarded send/stop on Subscription to prevent send-on-closed-channel panics and data races - Split Subscribe (streams after EOSE) and Fetch (closes on EOSE) per NIP-01 - Rewrite Publish to use channel-based OK dispatch instead of calling Receive directly, which raced with the auto-started Listen goroutine - Clean up all subscriptions when Listen exits so range loops terminate - Update tests and examples for new API
-rw-r--r--example_test.go35
-rw-r--r--examples/basic/main.go39
-rw-r--r--relay.go186
-rw-r--r--relay_test.go51
4 files changed, 176 insertions, 135 deletions
diff --git a/example_test.go b/example_test.go
index 90dae0f..6d10ced 100644
--- a/example_test.go
+++ b/example_test.go
@@ -53,8 +53,7 @@ func Example_basic() {
53// ExampleRelay demonstrates connecting to a relay (requires network). 53// ExampleRelay demonstrates connecting to a relay (requires network).
54// This is a documentation example - run with: go test -v -run ExampleRelay 54// This is a documentation example - run with: go test -v -run ExampleRelay
55func ExampleRelay() { 55func ExampleRelay() {
56 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 56 ctx := context.Background()
57 defer cancel()
58 57
59 // Connect to a public relay 58 // Connect to a public relay
60 relay, err := nostr.Connect(ctx, "wss://relay.damus.io") 59 relay, err := nostr.Connect(ctx, "wss://relay.damus.io")
@@ -66,35 +65,21 @@ func ExampleRelay() {
66 65
67 fmt.Println("Connected to relay!") 66 fmt.Println("Connected to relay!")
68 67
69 // Subscribe to recent text notes 68 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
69 defer cancel()
70
71 // Fetch recent text notes (closes on EOSE)
70 since := time.Now().Add(-1 * time.Hour).Unix() 72 since := time.Now().Add(-1 * time.Hour).Unix()
71 sub, err := relay.Subscribe(ctx, "my-sub", nostr.Filter{ 73 sub := relay.Fetch(ctx, nostr.Filter{
72 Kinds: []int{nostr.KindTextNote}, 74 Kinds: []int{nostr.KindTextNote},
73 Since: &since, 75 Since: &since,
74 Limit: 5, 76 Limit: 5,
75 }) 77 })
76 if err != nil {
77 fmt.Printf("Failed to subscribe: %v\n", err)
78 return
79 }
80
81 // Listen for events in the background
82 go relay.Listen(ctx)
83 78
84 // Collect events until EOSE
85 eventCount := 0 79 eventCount := 0
86 for { 80 for event := range sub.Events {
87 select { 81 eventCount++
88 case event := <-sub.Events: 82 fmt.Printf("Received event from %s...\n", event.PubKey[:8])
89 eventCount++
90 fmt.Printf("Received event from %s...\n", event.PubKey[:8])
91 case <-sub.EOSE:
92 fmt.Printf("Received %d events before EOSE\n", eventCount)
93 sub.Close(ctx)
94 return
95 case <-ctx.Done():
96 fmt.Println("Timeout")
97 return
98 }
99 } 83 }
84 fmt.Printf("Received %d events\n", eventCount)
100} 85}
diff --git a/examples/basic/main.go b/examples/basic/main.go
index 0c99dd9..1a4061a 100644
--- a/examples/basic/main.go
+++ b/examples/basic/main.go
@@ -53,11 +53,8 @@ func main() {
53 ExampleRelay() 53 ExampleRelay()
54} 54}
55 55
56// ExampleRelay demonstrates connecting to a relay (requires network).
57// This is a documentation example - run with: go test -v -run ExampleRelay
58func ExampleRelay() { 56func ExampleRelay() {
59 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 57 ctx := context.Background()
60 defer cancel()
61 58
62 // Connect to a public relay 59 // Connect to a public relay
63 relay, err := nostr.Connect(ctx, "wss://relay.damus.io") 60 relay, err := nostr.Connect(ctx, "wss://relay.damus.io")
@@ -66,38 +63,16 @@ func ExampleRelay() {
66 return 63 return
67 } 64 }
68 defer relay.Close() 65 defer relay.Close()
69
70 fmt.Println("Connected to relay!") 66 fmt.Println("Connected to relay!")
71 67
72 // Subscribe to recent text notes 68 ctx, cancel := context.WithTimeout(ctx, 25*time.Second)
73 since := time.Now().Add(-1 * time.Hour).Unix() 69 defer cancel()
74 sub, err := relay.Subscribe(ctx, "my-sub", nostr.Filter{ 70
71 filter := nostr.Filter{
75 Kinds: []int{nostr.KindTextNote}, 72 Kinds: []int{nostr.KindTextNote},
76 Since: &since,
77 Limit: 5, 73 Limit: 5,
78 })
79 if err != nil {
80 fmt.Printf("Failed to subscribe: %v\n", err)
81 os.Exit(1)
82 } 74 }
83 75 for event := range relay.Fetch(ctx, filter).Events {
84 // Listen for events in the background 76 fmt.Printf("Received event from %s...\n", event)
85 go relay.Listen(ctx)
86
87 // Collect events until EOSE
88 eventCount := 0
89 for {
90 select {
91 case event := <-sub.Events:
92 eventCount++
93 fmt.Printf("Received event from %s...\n", event)
94 case <-sub.EOSE:
95 fmt.Printf("Received %d events before EOSE\n", eventCount)
96 sub.Close(ctx)
97 return
98 case <-ctx.Done():
99 fmt.Println("Timeout")
100 return
101 }
102 } 77 }
103} 78}
diff --git a/relay.go b/relay.go
index 45f6119..bda76af 100644
--- a/relay.go
+++ b/relay.go
@@ -2,6 +2,7 @@ package nostr
2 2
3import ( 3import (
4 "context" 4 "context"
5 "crypto/rand"
5 "fmt" 6 "fmt"
6 "sync" 7 "sync"
7 8
@@ -16,6 +17,9 @@ type Relay struct {
16 17
17 subscriptions map[string]*Subscription 18 subscriptions map[string]*Subscription
18 subscriptionsMu sync.RWMutex 19 subscriptionsMu sync.RWMutex
20
21 okChannels map[string]chan *OKEnvelope
22 okChannelsMu sync.Mutex
19} 23}
20 24
21// Connect establishes a WebSocket connection to the relay. 25// Connect establishes a WebSocket connection to the relay.
@@ -25,11 +29,16 @@ func Connect(ctx context.Context, url string) (*Relay, error) {
25 return nil, fmt.Errorf("failed to connect to relay: %w", err) 29 return nil, fmt.Errorf("failed to connect to relay: %w", err)
26 } 30 }
27 31
28 return &Relay{ 32 r := &Relay{
29 URL: url, 33 URL: url,
30 conn: conn, 34 conn: conn,
31 subscriptions: make(map[string]*Subscription), 35 subscriptions: make(map[string]*Subscription),
32 }, nil 36 okChannels: make(map[string]chan *OKEnvelope),
37 }
38
39 go r.Listen(ctx)
40
41 return r, nil
33} 42}
34 43
35// Close closes the WebSocket connection. 44// Close closes the WebSocket connection.
@@ -83,47 +92,64 @@ func (r *Relay) Receive(ctx context.Context) (Envelope, error) {
83 92
84// Publish sends an event to the relay and waits for the OK response. 93// Publish sends an event to the relay and waits for the OK response.
85func (r *Relay) Publish(ctx context.Context, event *Event) error { 94func (r *Relay) Publish(ctx context.Context, event *Event) error {
95 ch := make(chan *OKEnvelope, 1)
96
97 r.okChannelsMu.Lock()
98 r.okChannels[event.ID] = ch
99 r.okChannelsMu.Unlock()
100
101 defer func() {
102 r.okChannelsMu.Lock()
103 delete(r.okChannels, event.ID)
104 r.okChannelsMu.Unlock()
105 }()
106
86 env := EventEnvelope{Event: event} 107 env := EventEnvelope{Event: event}
87 if err := r.Send(ctx, env); err != nil { 108 if err := r.Send(ctx, env); err != nil {
88 return fmt.Errorf("failed to send event: %w", err) 109 return fmt.Errorf("failed to send event: %w", err)
89 } 110 }
90 111
91 // Wait for OK response 112 select {
92 for { 113 case ok := <-ch:
93 resp, err := r.Receive(ctx) 114 if !ok.OK {
94 if err != nil { 115 return fmt.Errorf("event rejected: %s", ok.Message)
95 return fmt.Errorf("failed to receive response: %w", err)
96 }
97
98 if ok, isOK := resp.(*OKEnvelope); isOK {
99 if ok.EventID == event.ID {
100 if !ok.OK {
101 return fmt.Errorf("event rejected: %s", ok.Message)
102 }
103 return nil
104 }
105 } 116 }
106 117 return nil
107 // Dispatch other messages to subscriptions 118 case <-ctx.Done():
108 r.dispatchEnvelope(resp) 119 return ctx.Err()
109 } 120 }
110} 121}
111 122
112// Subscribe creates a subscription with the given filters. 123func genID() string {
113func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) { 124 buf := make([]byte, 5)
125 rand.Read(buf)
126 return fmt.Sprintf("%x", buf)
127}
128
129// subscribe is the internal implementation for Subscribe and Fetch.
130func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription {
131 id := genID()
132
114 sub := &Subscription{ 133 sub := &Subscription{
115 ID: id, 134 ID: id,
116 relay: r, 135 relay: r,
117 Filters: filters, 136 Filters: filters,
118 Events: make(chan *Event, 100), 137 Events: make(chan *Event, 100),
119 EOSE: make(chan struct{}, 1), 138 closeOnEOSE: closeOnEOSE,
120 closed: make(chan struct{}),
121 } 139 }
122 140
123 r.subscriptionsMu.Lock() 141 r.subscriptionsMu.Lock()
124 r.subscriptions[id] = sub 142 r.subscriptions[id] = sub
125 r.subscriptionsMu.Unlock() 143 r.subscriptionsMu.Unlock()
126 144
145 go func() {
146 <-ctx.Done()
147 sub.stop(ctx.Err())
148 r.subscriptionsMu.Lock()
149 delete(r.subscriptions, id)
150 r.subscriptionsMu.Unlock()
151 }()
152
127 env := ReqEnvelope{ 153 env := ReqEnvelope{
128 SubscriptionID: id, 154 SubscriptionID: id,
129 Filters: filters, 155 Filters: filters,
@@ -132,10 +158,24 @@ func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*S
132 r.subscriptionsMu.Lock() 158 r.subscriptionsMu.Lock()
133 delete(r.subscriptions, id) 159 delete(r.subscriptions, id)
134 r.subscriptionsMu.Unlock() 160 r.subscriptionsMu.Unlock()
135 return nil, fmt.Errorf("failed to send subscription request: %w", err) 161 sub.stop(fmt.Errorf("failed to send subscription request: %w", err))
136 } 162 }
137 163
138 return sub, nil 164 return sub
165}
166
167// Subscribe creates a subscription with the given filters.
168// Events are received on the Events channel until the context is cancelled.
169// After EOSE (end of stored events), the subscription continues to receive
170// real-time events per the Nostr protocol.
171func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription {
172 return r.subscribe(ctx, false, filters...)
173}
174
175// Fetch creates a subscription that closes automatically when EOSE is received.
176// Use this for one-shot queries where you only want stored events.
177func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription {
178 return r.subscribe(ctx, true, filters...)
139} 179}
140 180
141// dispatchEnvelope routes incoming messages to the appropriate subscription. 181// dispatchEnvelope routes incoming messages to the appropriate subscription.
@@ -146,35 +186,57 @@ func (r *Relay) dispatchEnvelope(env Envelope) {
146 sub, ok := r.subscriptions[e.SubscriptionID] 186 sub, ok := r.subscriptions[e.SubscriptionID]
147 r.subscriptionsMu.RUnlock() 187 r.subscriptionsMu.RUnlock()
148 if ok { 188 if ok {
149 select { 189 sub.send(e.Event)
150 case sub.Events <- e.Event:
151 default:
152 // Channel full, drop event
153 }
154 } 190 }
155 case *EOSEEnvelope: 191 case *EOSEEnvelope:
156 r.subscriptionsMu.RLock() 192 r.subscriptionsMu.RLock()
157 sub, ok := r.subscriptions[e.SubscriptionID] 193 sub, ok := r.subscriptions[e.SubscriptionID]
158 r.subscriptionsMu.RUnlock() 194 r.subscriptionsMu.RUnlock()
159 if ok { 195 if ok && sub.closeOnEOSE {
160 select { 196 r.subscriptionsMu.Lock()
161 case sub.EOSE <- struct{}{}: 197 delete(r.subscriptions, e.SubscriptionID)
162 default: 198 r.subscriptionsMu.Unlock()
163 } 199 sub.stop(nil)
164 } 200 }
165 case *ClosedEnvelope: 201 case *ClosedEnvelope:
166 r.subscriptionsMu.Lock() 202 r.subscriptionsMu.Lock()
167 if sub, ok := r.subscriptions[e.SubscriptionID]; ok { 203 sub, ok := r.subscriptions[e.SubscriptionID]
168 close(sub.closed) 204 if ok {
169 delete(r.subscriptions, e.SubscriptionID) 205 delete(r.subscriptions, e.SubscriptionID)
170 } 206 }
171 r.subscriptionsMu.Unlock() 207 r.subscriptionsMu.Unlock()
208 if ok {
209 sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message))
210 }
211 case *OKEnvelope:
212 r.okChannelsMu.Lock()
213 ch, ok := r.okChannels[e.EventID]
214 r.okChannelsMu.Unlock()
215 if ok {
216 select {
217 case ch <- e:
218 default:
219 }
220 }
172 } 221 }
173} 222}
174 223
175// Listen reads messages from the relay and dispatches them to subscriptions. 224// Listen reads messages from the relay and dispatches them to subscriptions.
176// This should be called in a goroutine when using multiple subscriptions.
177func (r *Relay) Listen(ctx context.Context) error { 225func (r *Relay) Listen(ctx context.Context) error {
226 defer func() {
227 r.subscriptionsMu.Lock()
228 subs := make([]*Subscription, 0, len(r.subscriptions))
229 for id, sub := range r.subscriptions {
230 subs = append(subs, sub)
231 delete(r.subscriptions, id)
232 }
233 r.subscriptionsMu.Unlock()
234
235 for _, sub := range subs {
236 sub.stop(fmt.Errorf("connection closed"))
237 }
238 }()
239
178 for { 240 for {
179 select { 241 select {
180 case <-ctx.Done(): 242 case <-ctx.Done():
@@ -197,12 +259,43 @@ type Subscription struct {
197 relay *Relay 259 relay *Relay
198 Filters []Filter 260 Filters []Filter
199 Events chan *Event 261 Events chan *Event
200 EOSE chan struct{} 262 Err error
201 closed chan struct{} 263
264 closeOnEOSE bool
265 mu sync.Mutex
266 done bool
267}
268
269// send delivers an event to the subscription's Events channel.
270func (s *Subscription) send(ev *Event) {
271 s.mu.Lock()
272 defer s.mu.Unlock()
273 if s.done {
274 return
275 }
276 select {
277 case s.Events <- ev:
278 default:
279 }
280}
281
282// stop closes the subscription's Events channel and sets the error.
283// It is idempotent — only the first call has any effect.
284func (s *Subscription) stop(err error) {
285 s.mu.Lock()
286 defer s.mu.Unlock()
287 if s.done {
288 return
289 }
290 s.done = true
291 s.Err = err
292 close(s.Events)
202} 293}
203 294
204// Close unsubscribes from the relay. 295// Close unsubscribes from the relay.
205func (s *Subscription) Close(ctx context.Context) error { 296func (s *Subscription) Close(ctx context.Context) error {
297 s.stop(nil)
298
206 s.relay.subscriptionsMu.Lock() 299 s.relay.subscriptionsMu.Lock()
207 delete(s.relay.subscriptions, s.ID) 300 delete(s.relay.subscriptions, s.ID)
208 s.relay.subscriptionsMu.Unlock() 301 s.relay.subscriptionsMu.Unlock()
@@ -210,8 +303,3 @@ func (s *Subscription) Close(ctx context.Context) error {
210 env := CloseEnvelope{SubscriptionID: s.ID} 303 env := CloseEnvelope{SubscriptionID: s.ID}
211 return s.relay.Send(ctx, env) 304 return s.relay.Send(ctx, env)
212} 305}
213
214// Closed returns a channel that's closed when the subscription is terminated.
215func (s *Subscription) Closed() <-chan struct{} {
216 return s.closed
217}
diff --git a/relay_test.go b/relay_test.go
index 4ace956..b39aa06 100644
--- a/relay_test.go
+++ b/relay_test.go
@@ -76,9 +76,16 @@ func TestRelaySendReceive(t *testing.T) {
76 url := "ws" + strings.TrimPrefix(server.URL, "http") 76 url := "ws" + strings.TrimPrefix(server.URL, "http")
77 ctx := context.Background() 77 ctx := context.Background()
78 78
79 relay, err := Connect(ctx, url) 79 // Create relay without auto-Listen to test Send/Receive directly
80 conn, _, err := websocket.Dial(ctx, url, nil)
80 if err != nil { 81 if err != nil {
81 t.Fatalf("Connect() error = %v", err) 82 t.Fatalf("Dial() error = %v", err)
83 }
84 relay := &Relay{
85 URL: url,
86 conn: conn,
87 subscriptions: make(map[string]*Subscription),
88 okChannels: make(map[string]chan *OKEnvelope),
82 } 89 }
83 defer relay.Close() 90 defer relay.Close()
84 91
@@ -234,7 +241,8 @@ func TestRelaySubscribe(t *testing.T) {
234 defer server.Close() 241 defer server.Close()
235 242
236 url := "ws" + strings.TrimPrefix(server.URL, "http") 243 url := "ws" + strings.TrimPrefix(server.URL, "http")
237 ctx := context.Background() 244 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
245 defer cancel()
238 246
239 relay, err := Connect(ctx, url) 247 relay, err := Connect(ctx, url)
240 if err != nil { 248 if err != nil {
@@ -242,30 +250,18 @@ func TestRelaySubscribe(t *testing.T) {
242 } 250 }
243 defer relay.Close() 251 defer relay.Close()
244 252
245 sub, err := relay.Subscribe(ctx, "sub1", Filter{Kinds: []int{1}}) 253 sub := relay.Fetch(ctx, Filter{Kinds: []int{1}})
246 if err != nil {
247 t.Fatalf("Subscribe() error = %v", err)
248 }
249 254
250 // Start listening in background
251 go relay.Listen(ctx)
252
253 // Collect events
254 eventCount := 0 255 eventCount := 0
255 timeout := time.After(2 * time.Second) 256 for range sub.Events {
256 257 eventCount++
257 for { 258 }
258 select { 259
259 case <-sub.Events: 260 if eventCount != 3 {
260 eventCount++ 261 t.Errorf("Received %d events, want 3", eventCount)
261 case <-sub.EOSE: 262 }
262 if eventCount != 3 { 263 if sub.Err != nil {
263 t.Errorf("Received %d events, want 3", eventCount) 264 t.Errorf("Subscription.Err = %v, want nil", sub.Err)
264 }
265 return
266 case <-timeout:
267 t.Fatal("Timeout waiting for events")
268 }
269 } 265 }
270} 266}
271 267
@@ -322,10 +318,7 @@ func TestSubscriptionClose(t *testing.T) {
322 } 318 }
323 defer relay.Close() 319 defer relay.Close()
324 320
325 sub, err := relay.Subscribe(ctx, "sub1", Filter{Kinds: []int{1}}) 321 sub := relay.Subscribe(ctx, Filter{Kinds: []int{1}})
326 if err != nil {
327 t.Fatalf("Subscribe() error = %v", err)
328 }
329 322
330 if err := sub.Close(ctx); err != nil { 323 if err := sub.Close(ctx); err != nil {
331 t.Errorf("Subscription.Close() error = %v", err) 324 t.Errorf("Subscription.Close() error = %v", err)