diff options
Diffstat (limited to 'relay.go')
| -rw-r--r-- | relay.go | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/relay.go b/relay.go new file mode 100644 index 0000000..45f6119 --- /dev/null +++ b/relay.go | |||
| @@ -0,0 +1,217 @@ | |||
| 1 | package nostr | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "fmt" | ||
| 6 | "sync" | ||
| 7 | |||
| 8 | "github.com/coder/websocket" | ||
| 9 | ) | ||
| 10 | |||
| 11 | // Relay represents a connection to a Nostr relay. | ||
| 12 | type Relay struct { | ||
| 13 | URL string | ||
| 14 | conn *websocket.Conn | ||
| 15 | mu sync.Mutex | ||
| 16 | |||
| 17 | subscriptions map[string]*Subscription | ||
| 18 | subscriptionsMu sync.RWMutex | ||
| 19 | } | ||
| 20 | |||
| 21 | // Connect establishes a WebSocket connection to the relay. | ||
| 22 | func Connect(ctx context.Context, url string) (*Relay, error) { | ||
| 23 | conn, _, err := websocket.Dial(ctx, url, nil) | ||
| 24 | if err != nil { | ||
| 25 | return nil, fmt.Errorf("failed to connect to relay: %w", err) | ||
| 26 | } | ||
| 27 | |||
| 28 | return &Relay{ | ||
| 29 | URL: url, | ||
| 30 | conn: conn, | ||
| 31 | subscriptions: make(map[string]*Subscription), | ||
| 32 | }, nil | ||
| 33 | } | ||
| 34 | |||
| 35 | // Close closes the WebSocket connection. | ||
| 36 | func (r *Relay) Close() error { | ||
| 37 | r.mu.Lock() | ||
| 38 | defer r.mu.Unlock() | ||
| 39 | |||
| 40 | if r.conn == nil { | ||
| 41 | return nil | ||
| 42 | } | ||
| 43 | |||
| 44 | err := r.conn.Close(websocket.StatusNormalClosure, "") | ||
| 45 | r.conn = nil | ||
| 46 | return err | ||
| 47 | } | ||
| 48 | |||
| 49 | // Send sends an envelope to the relay. | ||
| 50 | func (r *Relay) Send(ctx context.Context, env Envelope) error { | ||
| 51 | data, err := env.MarshalJSON() | ||
| 52 | if err != nil { | ||
| 53 | return fmt.Errorf("failed to marshal envelope: %w", err) | ||
| 54 | } | ||
| 55 | |||
| 56 | r.mu.Lock() | ||
| 57 | defer r.mu.Unlock() | ||
| 58 | |||
| 59 | if r.conn == nil { | ||
| 60 | return fmt.Errorf("connection closed") | ||
| 61 | } | ||
| 62 | |||
| 63 | return r.conn.Write(ctx, websocket.MessageText, data) | ||
| 64 | } | ||
| 65 | |||
| 66 | // Receive reads the next envelope from the relay. | ||
| 67 | func (r *Relay) Receive(ctx context.Context) (Envelope, error) { | ||
| 68 | r.mu.Lock() | ||
| 69 | conn := r.conn | ||
| 70 | r.mu.Unlock() | ||
| 71 | |||
| 72 | if conn == nil { | ||
| 73 | return nil, fmt.Errorf("connection closed") | ||
| 74 | } | ||
| 75 | |||
| 76 | _, data, err := conn.Read(ctx) | ||
| 77 | if err != nil { | ||
| 78 | return nil, fmt.Errorf("failed to read message: %w", err) | ||
| 79 | } | ||
| 80 | |||
| 81 | return ParseEnvelope(data) | ||
| 82 | } | ||
| 83 | |||
| 84 | // Publish sends an event to the relay and waits for the OK response. | ||
| 85 | func (r *Relay) Publish(ctx context.Context, event *Event) error { | ||
| 86 | env := EventEnvelope{Event: event} | ||
| 87 | if err := r.Send(ctx, env); err != nil { | ||
| 88 | return fmt.Errorf("failed to send event: %w", err) | ||
| 89 | } | ||
| 90 | |||
| 91 | // Wait for OK response | ||
| 92 | for { | ||
| 93 | resp, err := r.Receive(ctx) | ||
| 94 | if err != nil { | ||
| 95 | return fmt.Errorf("failed to receive response: %w", err) | ||
| 96 | } | ||
| 97 | |||
| 98 | if ok, isOK := resp.(*OKEnvelope); isOK { | ||
| 99 | if ok.EventID == event.ID { | ||
| 100 | if !ok.OK { | ||
| 101 | return fmt.Errorf("event rejected: %s", ok.Message) | ||
| 102 | } | ||
| 103 | return nil | ||
| 104 | } | ||
| 105 | } | ||
| 106 | |||
| 107 | // Dispatch other messages to subscriptions | ||
| 108 | r.dispatchEnvelope(resp) | ||
| 109 | } | ||
| 110 | } | ||
| 111 | |||
| 112 | // Subscribe creates a subscription with the given filters. | ||
| 113 | func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) { | ||
| 114 | sub := &Subscription{ | ||
| 115 | ID: id, | ||
| 116 | relay: r, | ||
| 117 | Filters: filters, | ||
| 118 | Events: make(chan *Event, 100), | ||
| 119 | EOSE: make(chan struct{}, 1), | ||
| 120 | closed: make(chan struct{}), | ||
| 121 | } | ||
| 122 | |||
| 123 | r.subscriptionsMu.Lock() | ||
| 124 | r.subscriptions[id] = sub | ||
| 125 | r.subscriptionsMu.Unlock() | ||
| 126 | |||
| 127 | env := ReqEnvelope{ | ||
| 128 | SubscriptionID: id, | ||
| 129 | Filters: filters, | ||
| 130 | } | ||
| 131 | if err := r.Send(ctx, env); err != nil { | ||
| 132 | r.subscriptionsMu.Lock() | ||
| 133 | delete(r.subscriptions, id) | ||
| 134 | r.subscriptionsMu.Unlock() | ||
| 135 | return nil, fmt.Errorf("failed to send subscription request: %w", err) | ||
| 136 | } | ||
| 137 | |||
| 138 | return sub, nil | ||
| 139 | } | ||
| 140 | |||
| 141 | // dispatchEnvelope routes incoming messages to the appropriate subscription. | ||
| 142 | func (r *Relay) dispatchEnvelope(env Envelope) { | ||
| 143 | switch e := env.(type) { | ||
| 144 | case *EventEnvelope: | ||
| 145 | r.subscriptionsMu.RLock() | ||
| 146 | sub, ok := r.subscriptions[e.SubscriptionID] | ||
| 147 | r.subscriptionsMu.RUnlock() | ||
| 148 | if ok { | ||
| 149 | select { | ||
| 150 | case sub.Events <- e.Event: | ||
| 151 | default: | ||
| 152 | // Channel full, drop event | ||
| 153 | } | ||
| 154 | } | ||
| 155 | case *EOSEEnvelope: | ||
| 156 | r.subscriptionsMu.RLock() | ||
| 157 | sub, ok := r.subscriptions[e.SubscriptionID] | ||
| 158 | r.subscriptionsMu.RUnlock() | ||
| 159 | if ok { | ||
| 160 | select { | ||
| 161 | case sub.EOSE <- struct{}{}: | ||
| 162 | default: | ||
| 163 | } | ||
| 164 | } | ||
| 165 | case *ClosedEnvelope: | ||
| 166 | r.subscriptionsMu.Lock() | ||
| 167 | if sub, ok := r.subscriptions[e.SubscriptionID]; ok { | ||
| 168 | close(sub.closed) | ||
| 169 | delete(r.subscriptions, e.SubscriptionID) | ||
| 170 | } | ||
| 171 | r.subscriptionsMu.Unlock() | ||
| 172 | } | ||
| 173 | } | ||
| 174 | |||
| 175 | // Listen reads messages from the relay and dispatches them to subscriptions. | ||
| 176 | // This should be called in a goroutine when using multiple subscriptions. | ||
| 177 | func (r *Relay) Listen(ctx context.Context) error { | ||
| 178 | for { | ||
| 179 | select { | ||
| 180 | case <-ctx.Done(): | ||
| 181 | return ctx.Err() | ||
| 182 | default: | ||
| 183 | } | ||
| 184 | |||
| 185 | env, err := r.Receive(ctx) | ||
| 186 | if err != nil { | ||
| 187 | return err | ||
| 188 | } | ||
| 189 | |||
| 190 | r.dispatchEnvelope(env) | ||
| 191 | } | ||
| 192 | } | ||
| 193 | |||
| 194 | // Subscription represents an active subscription to a relay. | ||
| 195 | type Subscription struct { | ||
| 196 | ID string | ||
| 197 | relay *Relay | ||
| 198 | Filters []Filter | ||
| 199 | Events chan *Event | ||
| 200 | EOSE chan struct{} | ||
| 201 | closed chan struct{} | ||
| 202 | } | ||
| 203 | |||
| 204 | // Close unsubscribes from the relay. | ||
| 205 | func (s *Subscription) Close(ctx context.Context) error { | ||
| 206 | s.relay.subscriptionsMu.Lock() | ||
| 207 | delete(s.relay.subscriptions, s.ID) | ||
| 208 | s.relay.subscriptionsMu.Unlock() | ||
| 209 | |||
| 210 | env := CloseEnvelope{SubscriptionID: s.ID} | ||
| 211 | return s.relay.Send(ctx, env) | ||
| 212 | } | ||
| 213 | |||
| 214 | // Closed returns a channel that's closed when the subscription is terminated. | ||
| 215 | func (s *Subscription) Closed() <-chan struct{} { | ||
| 216 | return s.closed | ||
| 217 | } | ||
