summaryrefslogtreecommitdiffstats
path: root/relay.go
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-02-07 15:20:57 -0800
committerbndw <ben@bdw.to>2026-02-07 15:20:57 -0800
commitd4fd2467d691a69a0ba75348086424b9fb33a297 (patch)
tree51bae6f1579e3248843a01053ccdea336f2730b2 /relay.go
wip
Diffstat (limited to 'relay.go')
-rw-r--r--relay.go217
1 files changed, 217 insertions, 0 deletions
diff --git a/relay.go b/relay.go
new file mode 100644
index 0000000..45f6119
--- /dev/null
+++ b/relay.go
@@ -0,0 +1,217 @@
1package nostr
2
3import (
4 "context"
5 "fmt"
6 "sync"
7
8 "github.com/coder/websocket"
9)
10
11// Relay represents a connection to a Nostr relay.
12type Relay struct {
13 URL string
14 conn *websocket.Conn
15 mu sync.Mutex
16
17 subscriptions map[string]*Subscription
18 subscriptionsMu sync.RWMutex
19}
20
21// Connect establishes a WebSocket connection to the relay.
22func Connect(ctx context.Context, url string) (*Relay, error) {
23 conn, _, err := websocket.Dial(ctx, url, nil)
24 if err != nil {
25 return nil, fmt.Errorf("failed to connect to relay: %w", err)
26 }
27
28 return &Relay{
29 URL: url,
30 conn: conn,
31 subscriptions: make(map[string]*Subscription),
32 }, nil
33}
34
35// Close closes the WebSocket connection.
36func (r *Relay) Close() error {
37 r.mu.Lock()
38 defer r.mu.Unlock()
39
40 if r.conn == nil {
41 return nil
42 }
43
44 err := r.conn.Close(websocket.StatusNormalClosure, "")
45 r.conn = nil
46 return err
47}
48
49// Send sends an envelope to the relay.
50func (r *Relay) Send(ctx context.Context, env Envelope) error {
51 data, err := env.MarshalJSON()
52 if err != nil {
53 return fmt.Errorf("failed to marshal envelope: %w", err)
54 }
55
56 r.mu.Lock()
57 defer r.mu.Unlock()
58
59 if r.conn == nil {
60 return fmt.Errorf("connection closed")
61 }
62
63 return r.conn.Write(ctx, websocket.MessageText, data)
64}
65
66// Receive reads the next envelope from the relay.
67func (r *Relay) Receive(ctx context.Context) (Envelope, error) {
68 r.mu.Lock()
69 conn := r.conn
70 r.mu.Unlock()
71
72 if conn == nil {
73 return nil, fmt.Errorf("connection closed")
74 }
75
76 _, data, err := conn.Read(ctx)
77 if err != nil {
78 return nil, fmt.Errorf("failed to read message: %w", err)
79 }
80
81 return ParseEnvelope(data)
82}
83
84// Publish sends an event to the relay and waits for the OK response.
85func (r *Relay) Publish(ctx context.Context, event *Event) error {
86 env := EventEnvelope{Event: event}
87 if err := r.Send(ctx, env); err != nil {
88 return fmt.Errorf("failed to send event: %w", err)
89 }
90
91 // Wait for OK response
92 for {
93 resp, err := r.Receive(ctx)
94 if err != nil {
95 return fmt.Errorf("failed to receive response: %w", err)
96 }
97
98 if ok, isOK := resp.(*OKEnvelope); isOK {
99 if ok.EventID == event.ID {
100 if !ok.OK {
101 return fmt.Errorf("event rejected: %s", ok.Message)
102 }
103 return nil
104 }
105 }
106
107 // Dispatch other messages to subscriptions
108 r.dispatchEnvelope(resp)
109 }
110}
111
112// Subscribe creates a subscription with the given filters.
113func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) {
114 sub := &Subscription{
115 ID: id,
116 relay: r,
117 Filters: filters,
118 Events: make(chan *Event, 100),
119 EOSE: make(chan struct{}, 1),
120 closed: make(chan struct{}),
121 }
122
123 r.subscriptionsMu.Lock()
124 r.subscriptions[id] = sub
125 r.subscriptionsMu.Unlock()
126
127 env := ReqEnvelope{
128 SubscriptionID: id,
129 Filters: filters,
130 }
131 if err := r.Send(ctx, env); err != nil {
132 r.subscriptionsMu.Lock()
133 delete(r.subscriptions, id)
134 r.subscriptionsMu.Unlock()
135 return nil, fmt.Errorf("failed to send subscription request: %w", err)
136 }
137
138 return sub, nil
139}
140
141// dispatchEnvelope routes incoming messages to the appropriate subscription.
142func (r *Relay) dispatchEnvelope(env Envelope) {
143 switch e := env.(type) {
144 case *EventEnvelope:
145 r.subscriptionsMu.RLock()
146 sub, ok := r.subscriptions[e.SubscriptionID]
147 r.subscriptionsMu.RUnlock()
148 if ok {
149 select {
150 case sub.Events <- e.Event:
151 default:
152 // Channel full, drop event
153 }
154 }
155 case *EOSEEnvelope:
156 r.subscriptionsMu.RLock()
157 sub, ok := r.subscriptions[e.SubscriptionID]
158 r.subscriptionsMu.RUnlock()
159 if ok {
160 select {
161 case sub.EOSE <- struct{}{}:
162 default:
163 }
164 }
165 case *ClosedEnvelope:
166 r.subscriptionsMu.Lock()
167 if sub, ok := r.subscriptions[e.SubscriptionID]; ok {
168 close(sub.closed)
169 delete(r.subscriptions, e.SubscriptionID)
170 }
171 r.subscriptionsMu.Unlock()
172 }
173}
174
175// Listen reads messages from the relay and dispatches them to subscriptions.
176// This should be called in a goroutine when using multiple subscriptions.
177func (r *Relay) Listen(ctx context.Context) error {
178 for {
179 select {
180 case <-ctx.Done():
181 return ctx.Err()
182 default:
183 }
184
185 env, err := r.Receive(ctx)
186 if err != nil {
187 return err
188 }
189
190 r.dispatchEnvelope(env)
191 }
192}
193
194// Subscription represents an active subscription to a relay.
195type Subscription struct {
196 ID string
197 relay *Relay
198 Filters []Filter
199 Events chan *Event
200 EOSE chan struct{}
201 closed chan struct{}
202}
203
204// Close unsubscribes from the relay.
205func (s *Subscription) Close(ctx context.Context) error {
206 s.relay.subscriptionsMu.Lock()
207 delete(s.relay.subscriptions, s.ID)
208 s.relay.subscriptionsMu.Unlock()
209
210 env := CloseEnvelope{SubscriptionID: s.ID}
211 return s.relay.Send(ctx, env)
212}
213
214// Closed returns a channel that's closed when the subscription is terminated.
215func (s *Subscription) Closed() <-chan struct{} {
216 return s.closed
217}