summaryrefslogtreecommitdiffstats
path: root/internal/nostr/relay.go
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-02-13 17:35:32 -0800
committerbndw <ben@bdw.to>2026-02-13 17:35:32 -0800
commit581ceecbf046f99b39885c74e2780a5320e5b15e (patch)
treec82dcaddb4f555d5051684221881e36f7fe3f718 /internal/nostr/relay.go
parent06b9b13274825f797523935494a1b5225f0e0862 (diff)
feat: add Nostr protocol implementation (internal/nostr, internal/websocket)
Diffstat (limited to 'internal/nostr/relay.go')
-rw-r--r--internal/nostr/relay.go305
1 files changed, 305 insertions, 0 deletions
diff --git a/internal/nostr/relay.go b/internal/nostr/relay.go
new file mode 100644
index 0000000..2b156e0
--- /dev/null
+++ b/internal/nostr/relay.go
@@ -0,0 +1,305 @@
1package nostr
2
3import (
4 "context"
5 "crypto/rand"
6 "fmt"
7 "sync"
8
9 "northwest.io/nostr-grpc/internal/websocket"
10)
11
12// Relay represents a connection to a Nostr relay.
13type Relay struct {
14 URL string
15 conn *websocket.Conn
16 mu sync.Mutex
17
18 subscriptions map[string]*Subscription
19 subscriptionsMu sync.RWMutex
20
21 okChannels map[string]chan *OKEnvelope
22 okChannelsMu sync.Mutex
23}
24
25// Connect establishes a WebSocket connection to the relay.
26func Connect(ctx context.Context, url string) (*Relay, error) {
27 conn, err := websocket.Dial(ctx, url)
28 if err != nil {
29 return nil, fmt.Errorf("failed to connect to relay: %w", err)
30 }
31
32 r := &Relay{
33 URL: url,
34 conn: conn,
35 subscriptions: make(map[string]*Subscription),
36 okChannels: make(map[string]chan *OKEnvelope),
37 }
38
39 go r.Listen(ctx)
40
41 return r, nil
42}
43
44// Close closes the WebSocket connection.
45func (r *Relay) Close() error {
46 r.mu.Lock()
47 defer r.mu.Unlock()
48
49 if r.conn == nil {
50 return nil
51 }
52
53 err := r.conn.Close(websocket.StatusNormalClosure, "")
54 r.conn = nil
55 return err
56}
57
58// Send sends an envelope to the relay.
59func (r *Relay) Send(ctx context.Context, env Envelope) error {
60 data, err := env.MarshalJSON()
61 if err != nil {
62 return fmt.Errorf("failed to marshal envelope: %w", err)
63 }
64
65 r.mu.Lock()
66 defer r.mu.Unlock()
67
68 if r.conn == nil {
69 return fmt.Errorf("connection closed")
70 }
71
72 return r.conn.Write(ctx, websocket.MessageText, data)
73}
74
75// Receive reads the next envelope from the relay.
76func (r *Relay) Receive(ctx context.Context) (Envelope, error) {
77 r.mu.Lock()
78 conn := r.conn
79 r.mu.Unlock()
80
81 if conn == nil {
82 return nil, fmt.Errorf("connection closed")
83 }
84
85 _, data, err := conn.Read(ctx)
86 if err != nil {
87 return nil, fmt.Errorf("failed to read message: %w", err)
88 }
89
90 return ParseEnvelope(data)
91}
92
93// Publish sends an event to the relay and waits for the OK response.
94func (r *Relay) Publish(ctx context.Context, event *Event) error {
95 ch := make(chan *OKEnvelope, 1)
96
97 r.okChannelsMu.Lock()
98 r.okChannels[event.ID] = ch
99 r.okChannelsMu.Unlock()
100
101 defer func() {
102 r.okChannelsMu.Lock()
103 delete(r.okChannels, event.ID)
104 r.okChannelsMu.Unlock()
105 }()
106
107 env := EventEnvelope{Event: event}
108 if err := r.Send(ctx, env); err != nil {
109 return fmt.Errorf("failed to send event: %w", err)
110 }
111
112 select {
113 case ok := <-ch:
114 if !ok.OK {
115 return fmt.Errorf("event rejected: %s", ok.Message)
116 }
117 return nil
118 case <-ctx.Done():
119 return ctx.Err()
120 }
121}
122
123func genID() string {
124 buf := make([]byte, 5)
125 rand.Read(buf)
126 return fmt.Sprintf("%x", buf)
127}
128
129// subscribe is the internal implementation for Subscribe and Fetch.
130func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription {
131 id := genID()
132
133 sub := &Subscription{
134 ID: id,
135 relay: r,
136 Filters: filters,
137 Events: make(chan *Event, 100),
138 closeOnEOSE: closeOnEOSE,
139 }
140
141 r.subscriptionsMu.Lock()
142 r.subscriptions[id] = sub
143 r.subscriptionsMu.Unlock()
144
145 go func() {
146 <-ctx.Done()
147 sub.stop(ctx.Err())
148 r.subscriptionsMu.Lock()
149 delete(r.subscriptions, id)
150 r.subscriptionsMu.Unlock()
151 }()
152
153 env := ReqEnvelope{
154 SubscriptionID: id,
155 Filters: filters,
156 }
157 if err := r.Send(ctx, env); err != nil {
158 r.subscriptionsMu.Lock()
159 delete(r.subscriptions, id)
160 r.subscriptionsMu.Unlock()
161 sub.stop(fmt.Errorf("failed to send subscription request: %w", err))
162 }
163
164 return sub
165}
166
167// Subscribe creates a subscription with the given filters.
168// Events are received on the Events channel until the context is cancelled.
169// After EOSE (end of stored events), the subscription continues to receive
170// real-time events per the Nostr protocol.
171func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription {
172 return r.subscribe(ctx, false, filters...)
173}
174
175// Fetch creates a subscription that closes automatically when EOSE is received.
176// Use this for one-shot queries where you only want stored events.
177func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription {
178 return r.subscribe(ctx, true, filters...)
179}
180
181// dispatchEnvelope routes incoming messages to the appropriate subscription.
182func (r *Relay) dispatchEnvelope(env Envelope) {
183 switch e := env.(type) {
184 case *EventEnvelope:
185 r.subscriptionsMu.RLock()
186 sub, ok := r.subscriptions[e.SubscriptionID]
187 r.subscriptionsMu.RUnlock()
188 if ok {
189 sub.send(e.Event)
190 }
191 case *EOSEEnvelope:
192 r.subscriptionsMu.RLock()
193 sub, ok := r.subscriptions[e.SubscriptionID]
194 r.subscriptionsMu.RUnlock()
195 if ok && sub.closeOnEOSE {
196 r.subscriptionsMu.Lock()
197 delete(r.subscriptions, e.SubscriptionID)
198 r.subscriptionsMu.Unlock()
199 sub.stop(nil)
200 }
201 case *ClosedEnvelope:
202 r.subscriptionsMu.Lock()
203 sub, ok := r.subscriptions[e.SubscriptionID]
204 if ok {
205 delete(r.subscriptions, e.SubscriptionID)
206 }
207 r.subscriptionsMu.Unlock()
208 if ok {
209 sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message))
210 }
211 case *OKEnvelope:
212 r.okChannelsMu.Lock()
213 ch, ok := r.okChannels[e.EventID]
214 r.okChannelsMu.Unlock()
215 if ok {
216 select {
217 case ch <- e:
218 default:
219 }
220 }
221 }
222}
223
224// Listen reads messages from the relay and dispatches them to subscriptions.
225func (r *Relay) Listen(ctx context.Context) error {
226 defer func() {
227 r.subscriptionsMu.Lock()
228 subs := make([]*Subscription, 0, len(r.subscriptions))
229 for id, sub := range r.subscriptions {
230 subs = append(subs, sub)
231 delete(r.subscriptions, id)
232 }
233 r.subscriptionsMu.Unlock()
234
235 for _, sub := range subs {
236 sub.stop(fmt.Errorf("connection closed"))
237 }
238 }()
239
240 for {
241 select {
242 case <-ctx.Done():
243 return ctx.Err()
244 default:
245 }
246
247 env, err := r.Receive(ctx)
248 if err != nil {
249 return err
250 }
251
252 r.dispatchEnvelope(env)
253 }
254}
255
256// Subscription represents an active subscription to a relay.
257type Subscription struct {
258 ID string
259 relay *Relay
260 Filters []Filter
261 Events chan *Event
262 Err error
263
264 closeOnEOSE bool
265 mu sync.Mutex
266 done bool
267}
268
269// send delivers an event to the subscription's Events channel.
270func (s *Subscription) send(ev *Event) {
271 s.mu.Lock()
272 defer s.mu.Unlock()
273 if s.done {
274 return
275 }
276 select {
277 case s.Events <- ev:
278 default:
279 }
280}
281
282// stop closes the subscription's Events channel and sets the error.
283// It is idempotent — only the first call has any effect.
284func (s *Subscription) stop(err error) {
285 s.mu.Lock()
286 defer s.mu.Unlock()
287 if s.done {
288 return
289 }
290 s.done = true
291 s.Err = err
292 close(s.Events)
293}
294
295// Close unsubscribes from the relay.
296func (s *Subscription) Close(ctx context.Context) error {
297 s.stop(nil)
298
299 s.relay.subscriptionsMu.Lock()
300 delete(s.relay.subscriptions, s.ID)
301 s.relay.subscriptionsMu.Unlock()
302
303 env := CloseEnvelope{SubscriptionID: s.ID}
304 return s.relay.Send(ctx, env)
305}