summaryrefslogtreecommitdiffstats
path: root/relay.go
diff options
context:
space:
mode:
Diffstat (limited to 'relay.go')
-rw-r--r--relay.go186
1 files changed, 137 insertions, 49 deletions
diff --git a/relay.go b/relay.go
index 45f6119..bda76af 100644
--- a/relay.go
+++ b/relay.go
@@ -2,6 +2,7 @@ package nostr
2 2
3import ( 3import (
4 "context" 4 "context"
5 "crypto/rand"
5 "fmt" 6 "fmt"
6 "sync" 7 "sync"
7 8
@@ -16,6 +17,9 @@ type Relay struct {
16 17
17 subscriptions map[string]*Subscription 18 subscriptions map[string]*Subscription
18 subscriptionsMu sync.RWMutex 19 subscriptionsMu sync.RWMutex
20
21 okChannels map[string]chan *OKEnvelope
22 okChannelsMu sync.Mutex
19} 23}
20 24
21// Connect establishes a WebSocket connection to the relay. 25// Connect establishes a WebSocket connection to the relay.
@@ -25,11 +29,16 @@ func Connect(ctx context.Context, url string) (*Relay, error) {
25 return nil, fmt.Errorf("failed to connect to relay: %w", err) 29 return nil, fmt.Errorf("failed to connect to relay: %w", err)
26 } 30 }
27 31
28 return &Relay{ 32 r := &Relay{
29 URL: url, 33 URL: url,
30 conn: conn, 34 conn: conn,
31 subscriptions: make(map[string]*Subscription), 35 subscriptions: make(map[string]*Subscription),
32 }, nil 36 okChannels: make(map[string]chan *OKEnvelope),
37 }
38
39 go r.Listen(ctx)
40
41 return r, nil
33} 42}
34 43
35// Close closes the WebSocket connection. 44// Close closes the WebSocket connection.
@@ -83,47 +92,64 @@ func (r *Relay) Receive(ctx context.Context) (Envelope, error) {
83 92
84// Publish sends an event to the relay and waits for the OK response. 93// Publish sends an event to the relay and waits for the OK response.
85func (r *Relay) Publish(ctx context.Context, event *Event) error { 94func (r *Relay) Publish(ctx context.Context, event *Event) error {
95 ch := make(chan *OKEnvelope, 1)
96
97 r.okChannelsMu.Lock()
98 r.okChannels[event.ID] = ch
99 r.okChannelsMu.Unlock()
100
101 defer func() {
102 r.okChannelsMu.Lock()
103 delete(r.okChannels, event.ID)
104 r.okChannelsMu.Unlock()
105 }()
106
86 env := EventEnvelope{Event: event} 107 env := EventEnvelope{Event: event}
87 if err := r.Send(ctx, env); err != nil { 108 if err := r.Send(ctx, env); err != nil {
88 return fmt.Errorf("failed to send event: %w", err) 109 return fmt.Errorf("failed to send event: %w", err)
89 } 110 }
90 111
91 // Wait for OK response 112 select {
92 for { 113 case ok := <-ch:
93 resp, err := r.Receive(ctx) 114 if !ok.OK {
94 if err != nil { 115 return fmt.Errorf("event rejected: %s", ok.Message)
95 return fmt.Errorf("failed to receive response: %w", err)
96 }
97
98 if ok, isOK := resp.(*OKEnvelope); isOK {
99 if ok.EventID == event.ID {
100 if !ok.OK {
101 return fmt.Errorf("event rejected: %s", ok.Message)
102 }
103 return nil
104 }
105 } 116 }
106 117 return nil
107 // Dispatch other messages to subscriptions 118 case <-ctx.Done():
108 r.dispatchEnvelope(resp) 119 return ctx.Err()
109 } 120 }
110} 121}
111 122
112// Subscribe creates a subscription with the given filters. 123func genID() string {
113func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) { 124 buf := make([]byte, 5)
125 rand.Read(buf)
126 return fmt.Sprintf("%x", buf)
127}
128
129// subscribe is the internal implementation for Subscribe and Fetch.
130func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription {
131 id := genID()
132
114 sub := &Subscription{ 133 sub := &Subscription{
115 ID: id, 134 ID: id,
116 relay: r, 135 relay: r,
117 Filters: filters, 136 Filters: filters,
118 Events: make(chan *Event, 100), 137 Events: make(chan *Event, 100),
119 EOSE: make(chan struct{}, 1), 138 closeOnEOSE: closeOnEOSE,
120 closed: make(chan struct{}),
121 } 139 }
122 140
123 r.subscriptionsMu.Lock() 141 r.subscriptionsMu.Lock()
124 r.subscriptions[id] = sub 142 r.subscriptions[id] = sub
125 r.subscriptionsMu.Unlock() 143 r.subscriptionsMu.Unlock()
126 144
145 go func() {
146 <-ctx.Done()
147 sub.stop(ctx.Err())
148 r.subscriptionsMu.Lock()
149 delete(r.subscriptions, id)
150 r.subscriptionsMu.Unlock()
151 }()
152
127 env := ReqEnvelope{ 153 env := ReqEnvelope{
128 SubscriptionID: id, 154 SubscriptionID: id,
129 Filters: filters, 155 Filters: filters,
@@ -132,10 +158,24 @@ func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*S
132 r.subscriptionsMu.Lock() 158 r.subscriptionsMu.Lock()
133 delete(r.subscriptions, id) 159 delete(r.subscriptions, id)
134 r.subscriptionsMu.Unlock() 160 r.subscriptionsMu.Unlock()
135 return nil, fmt.Errorf("failed to send subscription request: %w", err) 161 sub.stop(fmt.Errorf("failed to send subscription request: %w", err))
136 } 162 }
137 163
138 return sub, nil 164 return sub
165}
166
167// Subscribe creates a subscription with the given filters.
168// Events are received on the Events channel until the context is cancelled.
169// After EOSE (end of stored events), the subscription continues to receive
170// real-time events per the Nostr protocol.
171func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription {
172 return r.subscribe(ctx, false, filters...)
173}
174
175// Fetch creates a subscription that closes automatically when EOSE is received.
176// Use this for one-shot queries where you only want stored events.
177func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription {
178 return r.subscribe(ctx, true, filters...)
139} 179}
140 180
141// dispatchEnvelope routes incoming messages to the appropriate subscription. 181// dispatchEnvelope routes incoming messages to the appropriate subscription.
@@ -146,35 +186,57 @@ func (r *Relay) dispatchEnvelope(env Envelope) {
146 sub, ok := r.subscriptions[e.SubscriptionID] 186 sub, ok := r.subscriptions[e.SubscriptionID]
147 r.subscriptionsMu.RUnlock() 187 r.subscriptionsMu.RUnlock()
148 if ok { 188 if ok {
149 select { 189 sub.send(e.Event)
150 case sub.Events <- e.Event:
151 default:
152 // Channel full, drop event
153 }
154 } 190 }
155 case *EOSEEnvelope: 191 case *EOSEEnvelope:
156 r.subscriptionsMu.RLock() 192 r.subscriptionsMu.RLock()
157 sub, ok := r.subscriptions[e.SubscriptionID] 193 sub, ok := r.subscriptions[e.SubscriptionID]
158 r.subscriptionsMu.RUnlock() 194 r.subscriptionsMu.RUnlock()
159 if ok { 195 if ok && sub.closeOnEOSE {
160 select { 196 r.subscriptionsMu.Lock()
161 case sub.EOSE <- struct{}{}: 197 delete(r.subscriptions, e.SubscriptionID)
162 default: 198 r.subscriptionsMu.Unlock()
163 } 199 sub.stop(nil)
164 } 200 }
165 case *ClosedEnvelope: 201 case *ClosedEnvelope:
166 r.subscriptionsMu.Lock() 202 r.subscriptionsMu.Lock()
167 if sub, ok := r.subscriptions[e.SubscriptionID]; ok { 203 sub, ok := r.subscriptions[e.SubscriptionID]
168 close(sub.closed) 204 if ok {
169 delete(r.subscriptions, e.SubscriptionID) 205 delete(r.subscriptions, e.SubscriptionID)
170 } 206 }
171 r.subscriptionsMu.Unlock() 207 r.subscriptionsMu.Unlock()
208 if ok {
209 sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message))
210 }
211 case *OKEnvelope:
212 r.okChannelsMu.Lock()
213 ch, ok := r.okChannels[e.EventID]
214 r.okChannelsMu.Unlock()
215 if ok {
216 select {
217 case ch <- e:
218 default:
219 }
220 }
172 } 221 }
173} 222}
174 223
175// Listen reads messages from the relay and dispatches them to subscriptions. 224// Listen reads messages from the relay and dispatches them to subscriptions.
176// This should be called in a goroutine when using multiple subscriptions.
177func (r *Relay) Listen(ctx context.Context) error { 225func (r *Relay) Listen(ctx context.Context) error {
226 defer func() {
227 r.subscriptionsMu.Lock()
228 subs := make([]*Subscription, 0, len(r.subscriptions))
229 for id, sub := range r.subscriptions {
230 subs = append(subs, sub)
231 delete(r.subscriptions, id)
232 }
233 r.subscriptionsMu.Unlock()
234
235 for _, sub := range subs {
236 sub.stop(fmt.Errorf("connection closed"))
237 }
238 }()
239
178 for { 240 for {
179 select { 241 select {
180 case <-ctx.Done(): 242 case <-ctx.Done():
@@ -197,12 +259,43 @@ type Subscription struct {
197 relay *Relay 259 relay *Relay
198 Filters []Filter 260 Filters []Filter
199 Events chan *Event 261 Events chan *Event
200 EOSE chan struct{} 262 Err error
201 closed chan struct{} 263
264 closeOnEOSE bool
265 mu sync.Mutex
266 done bool
267}
268
269// send delivers an event to the subscription's Events channel.
270func (s *Subscription) send(ev *Event) {
271 s.mu.Lock()
272 defer s.mu.Unlock()
273 if s.done {
274 return
275 }
276 select {
277 case s.Events <- ev:
278 default:
279 }
280}
281
282// stop closes the subscription's Events channel and sets the error.
283// It is idempotent — only the first call has any effect.
284func (s *Subscription) stop(err error) {
285 s.mu.Lock()
286 defer s.mu.Unlock()
287 if s.done {
288 return
289 }
290 s.done = true
291 s.Err = err
292 close(s.Events)
202} 293}
203 294
204// Close unsubscribes from the relay. 295// Close unsubscribes from the relay.
205func (s *Subscription) Close(ctx context.Context) error { 296func (s *Subscription) Close(ctx context.Context) error {
297 s.stop(nil)
298
206 s.relay.subscriptionsMu.Lock() 299 s.relay.subscriptionsMu.Lock()
207 delete(s.relay.subscriptions, s.ID) 300 delete(s.relay.subscriptions, s.ID)
208 s.relay.subscriptionsMu.Unlock() 301 s.relay.subscriptionsMu.Unlock()
@@ -210,8 +303,3 @@ func (s *Subscription) Close(ctx context.Context) error {
210 env := CloseEnvelope{SubscriptionID: s.ID} 303 env := CloseEnvelope{SubscriptionID: s.ID}
211 return s.relay.Send(ctx, env) 304 return s.relay.Send(ctx, env)
212} 305}
213
214// Closed returns a channel that's closed when the subscription is terminated.
215func (s *Subscription) Closed() <-chan struct{} {
216 return s.closed
217}