package nostr import ( "context" "crypto/rand" "fmt" "sync" "code.northwest.io/nostr/internal/websocket" ) // Relay represents a connection to a Nostr relay. type Relay struct { URL string conn *websocket.Conn mu sync.Mutex subscriptions map[string]*Subscription subscriptionsMu sync.RWMutex okChannels map[string]chan *OKEnvelope okChannelsMu sync.Mutex } // Connect establishes a WebSocket connection to the relay. func Connect(ctx context.Context, url string) (*Relay, error) { conn, err := websocket.Dial(ctx, url) if err != nil { return nil, fmt.Errorf("failed to connect to relay: %w", err) } r := &Relay{ URL: url, conn: conn, subscriptions: make(map[string]*Subscription), okChannels: make(map[string]chan *OKEnvelope), } go r.Listen(ctx) return r, nil } // Close closes the WebSocket connection. func (r *Relay) Close() error { r.mu.Lock() defer r.mu.Unlock() if r.conn == nil { return nil } err := r.conn.Close(websocket.StatusNormalClosure, "") r.conn = nil return err } // Send sends an envelope to the relay. func (r *Relay) Send(ctx context.Context, env Envelope) error { data, err := env.MarshalJSON() if err != nil { return fmt.Errorf("failed to marshal envelope: %w", err) } r.mu.Lock() defer r.mu.Unlock() if r.conn == nil { return fmt.Errorf("connection closed") } return r.conn.Write(ctx, websocket.MessageText, data) } // Receive reads the next envelope from the relay. func (r *Relay) Receive(ctx context.Context) (Envelope, error) { r.mu.Lock() conn := r.conn r.mu.Unlock() if conn == nil { return nil, fmt.Errorf("connection closed") } _, data, err := conn.Read(ctx) if err != nil { return nil, fmt.Errorf("failed to read message: %w", err) } return ParseEnvelope(data) } // Publish sends an event to the relay and waits for the OK response. func (r *Relay) Publish(ctx context.Context, event *Event) error { ch := make(chan *OKEnvelope, 1) r.okChannelsMu.Lock() r.okChannels[event.ID] = ch r.okChannelsMu.Unlock() defer func() { r.okChannelsMu.Lock() delete(r.okChannels, event.ID) r.okChannelsMu.Unlock() }() env := EventEnvelope{Event: event} if err := r.Send(ctx, env); err != nil { return fmt.Errorf("failed to send event: %w", err) } select { case ok := <-ch: if !ok.OK { return fmt.Errorf("event rejected: %s", ok.Message) } return nil case <-ctx.Done(): return ctx.Err() } } func genID() string { buf := make([]byte, 5) rand.Read(buf) return fmt.Sprintf("%x", buf) } // subscribe is the internal implementation for Subscribe and Fetch. func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription { id := genID() sub := &Subscription{ ID: id, relay: r, Filters: filters, Events: make(chan *Event, 100), closeOnEOSE: closeOnEOSE, } r.subscriptionsMu.Lock() r.subscriptions[id] = sub r.subscriptionsMu.Unlock() go func() { <-ctx.Done() sub.stop(ctx.Err()) r.subscriptionsMu.Lock() delete(r.subscriptions, id) r.subscriptionsMu.Unlock() }() env := ReqEnvelope{ SubscriptionID: id, Filters: filters, } if err := r.Send(ctx, env); err != nil { r.subscriptionsMu.Lock() delete(r.subscriptions, id) r.subscriptionsMu.Unlock() sub.stop(fmt.Errorf("failed to send subscription request: %w", err)) } return sub } // Subscribe creates a subscription with the given filters. // Events are received on the Events channel until the context is cancelled. // After EOSE (end of stored events), the subscription continues to receive // real-time events per the Nostr protocol. func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription { return r.subscribe(ctx, false, filters...) } // Fetch creates a subscription that closes automatically when EOSE is received. // Use this for one-shot queries where you only want stored events. func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription { return r.subscribe(ctx, true, filters...) } // dispatchEnvelope routes incoming messages to the appropriate subscription. func (r *Relay) dispatchEnvelope(env Envelope) { switch e := env.(type) { case *EventEnvelope: r.subscriptionsMu.RLock() sub, ok := r.subscriptions[e.SubscriptionID] r.subscriptionsMu.RUnlock() if ok { sub.send(e.Event) } case *EOSEEnvelope: r.subscriptionsMu.RLock() sub, ok := r.subscriptions[e.SubscriptionID] r.subscriptionsMu.RUnlock() if ok && sub.closeOnEOSE { r.subscriptionsMu.Lock() delete(r.subscriptions, e.SubscriptionID) r.subscriptionsMu.Unlock() sub.stop(nil) } case *ClosedEnvelope: r.subscriptionsMu.Lock() sub, ok := r.subscriptions[e.SubscriptionID] if ok { delete(r.subscriptions, e.SubscriptionID) } r.subscriptionsMu.Unlock() if ok { sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message)) } case *OKEnvelope: r.okChannelsMu.Lock() ch, ok := r.okChannels[e.EventID] r.okChannelsMu.Unlock() if ok { select { case ch <- e: default: } } } } // Listen reads messages from the relay and dispatches them to subscriptions. func (r *Relay) Listen(ctx context.Context) error { defer func() { r.subscriptionsMu.Lock() subs := make([]*Subscription, 0, len(r.subscriptions)) for id, sub := range r.subscriptions { subs = append(subs, sub) delete(r.subscriptions, id) } r.subscriptionsMu.Unlock() for _, sub := range subs { sub.stop(fmt.Errorf("connection closed")) } }() for { select { case <-ctx.Done(): return ctx.Err() default: } env, err := r.Receive(ctx) if err != nil { return err } r.dispatchEnvelope(env) } } // Subscription represents an active subscription to a relay. type Subscription struct { ID string relay *Relay Filters []Filter Events chan *Event Err error closeOnEOSE bool mu sync.Mutex done bool } // send delivers an event to the subscription's Events channel. func (s *Subscription) send(ev *Event) { s.mu.Lock() defer s.mu.Unlock() if s.done { return } select { case s.Events <- ev: default: } } // stop closes the subscription's Events channel and sets the error. // It is idempotent — only the first call has any effect. func (s *Subscription) stop(err error) { s.mu.Lock() defer s.mu.Unlock() if s.done { return } s.done = true s.Err = err close(s.Events) } // Close unsubscribes from the relay. func (s *Subscription) Close(ctx context.Context) error { s.stop(nil) s.relay.subscriptionsMu.Lock() delete(s.relay.subscriptions, s.ID) s.relay.subscriptionsMu.Unlock() env := CloseEnvelope{SubscriptionID: s.ID} return s.relay.Send(ctx, env) }