package nostr import ( "context" "fmt" "sync" "github.com/coder/websocket" ) // Relay represents a connection to a Nostr relay. type Relay struct { URL string conn *websocket.Conn mu sync.Mutex subscriptions map[string]*Subscription subscriptionsMu sync.RWMutex } // Connect establishes a WebSocket connection to the relay. func Connect(ctx context.Context, url string) (*Relay, error) { conn, _, err := websocket.Dial(ctx, url, nil) if err != nil { return nil, fmt.Errorf("failed to connect to relay: %w", err) } return &Relay{ URL: url, conn: conn, subscriptions: make(map[string]*Subscription), }, nil } // Close closes the WebSocket connection. func (r *Relay) Close() error { r.mu.Lock() defer r.mu.Unlock() if r.conn == nil { return nil } err := r.conn.Close(websocket.StatusNormalClosure, "") r.conn = nil return err } // Send sends an envelope to the relay. func (r *Relay) Send(ctx context.Context, env Envelope) error { data, err := env.MarshalJSON() if err != nil { return fmt.Errorf("failed to marshal envelope: %w", err) } r.mu.Lock() defer r.mu.Unlock() if r.conn == nil { return fmt.Errorf("connection closed") } return r.conn.Write(ctx, websocket.MessageText, data) } // Receive reads the next envelope from the relay. func (r *Relay) Receive(ctx context.Context) (Envelope, error) { r.mu.Lock() conn := r.conn r.mu.Unlock() if conn == nil { return nil, fmt.Errorf("connection closed") } _, data, err := conn.Read(ctx) if err != nil { return nil, fmt.Errorf("failed to read message: %w", err) } return ParseEnvelope(data) } // Publish sends an event to the relay and waits for the OK response. func (r *Relay) Publish(ctx context.Context, event *Event) error { env := EventEnvelope{Event: event} if err := r.Send(ctx, env); err != nil { return fmt.Errorf("failed to send event: %w", err) } // Wait for OK response for { resp, err := r.Receive(ctx) if err != nil { return fmt.Errorf("failed to receive response: %w", err) } if ok, isOK := resp.(*OKEnvelope); isOK { if ok.EventID == event.ID { if !ok.OK { return fmt.Errorf("event rejected: %s", ok.Message) } return nil } } // Dispatch other messages to subscriptions r.dispatchEnvelope(resp) } } // Subscribe creates a subscription with the given filters. func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) { sub := &Subscription{ ID: id, relay: r, Filters: filters, Events: make(chan *Event, 100), EOSE: make(chan struct{}, 1), closed: make(chan struct{}), } r.subscriptionsMu.Lock() r.subscriptions[id] = sub r.subscriptionsMu.Unlock() env := ReqEnvelope{ SubscriptionID: id, Filters: filters, } if err := r.Send(ctx, env); err != nil { r.subscriptionsMu.Lock() delete(r.subscriptions, id) r.subscriptionsMu.Unlock() return nil, fmt.Errorf("failed to send subscription request: %w", err) } return sub, nil } // dispatchEnvelope routes incoming messages to the appropriate subscription. func (r *Relay) dispatchEnvelope(env Envelope) { switch e := env.(type) { case *EventEnvelope: r.subscriptionsMu.RLock() sub, ok := r.subscriptions[e.SubscriptionID] r.subscriptionsMu.RUnlock() if ok { select { case sub.Events <- e.Event: default: // Channel full, drop event } } case *EOSEEnvelope: r.subscriptionsMu.RLock() sub, ok := r.subscriptions[e.SubscriptionID] r.subscriptionsMu.RUnlock() if ok { select { case sub.EOSE <- struct{}{}: default: } } case *ClosedEnvelope: r.subscriptionsMu.Lock() if sub, ok := r.subscriptions[e.SubscriptionID]; ok { close(sub.closed) delete(r.subscriptions, e.SubscriptionID) } r.subscriptionsMu.Unlock() } } // Listen reads messages from the relay and dispatches them to subscriptions. // This should be called in a goroutine when using multiple subscriptions. func (r *Relay) Listen(ctx context.Context) error { for { select { case <-ctx.Done(): return ctx.Err() default: } env, err := r.Receive(ctx) if err != nil { return err } r.dispatchEnvelope(env) } } // Subscription represents an active subscription to a relay. type Subscription struct { ID string relay *Relay Filters []Filter Events chan *Event EOSE chan struct{} closed chan struct{} } // Close unsubscribes from the relay. func (s *Subscription) Close(ctx context.Context) error { s.relay.subscriptionsMu.Lock() delete(s.relay.subscriptions, s.ID) s.relay.subscriptionsMu.Unlock() env := CloseEnvelope{SubscriptionID: s.ID} return s.relay.Send(ctx, env) } // Closed returns a channel that's closed when the subscription is terminated. func (s *Subscription) Closed() <-chan struct{} { return s.closed }