From d4fd2467d691a69a0ba75348086424b9fb33a297 Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 7 Feb 2026 15:20:57 -0800 Subject: wip --- relay.go | 217 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 relay.go (limited to 'relay.go') diff --git a/relay.go b/relay.go new file mode 100644 index 0000000..45f6119 --- /dev/null +++ b/relay.go @@ -0,0 +1,217 @@ +package nostr + +import ( + "context" + "fmt" + "sync" + + "github.com/coder/websocket" +) + +// Relay represents a connection to a Nostr relay. +type Relay struct { + URL string + conn *websocket.Conn + mu sync.Mutex + + subscriptions map[string]*Subscription + subscriptionsMu sync.RWMutex +} + +// Connect establishes a WebSocket connection to the relay. +func Connect(ctx context.Context, url string) (*Relay, error) { + conn, _, err := websocket.Dial(ctx, url, nil) + if err != nil { + return nil, fmt.Errorf("failed to connect to relay: %w", err) + } + + return &Relay{ + URL: url, + conn: conn, + subscriptions: make(map[string]*Subscription), + }, nil +} + +// Close closes the WebSocket connection. +func (r *Relay) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.conn == nil { + return nil + } + + err := r.conn.Close(websocket.StatusNormalClosure, "") + r.conn = nil + return err +} + +// Send sends an envelope to the relay. +func (r *Relay) Send(ctx context.Context, env Envelope) error { + data, err := env.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to marshal envelope: %w", err) + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.conn == nil { + return fmt.Errorf("connection closed") + } + + return r.conn.Write(ctx, websocket.MessageText, data) +} + +// Receive reads the next envelope from the relay. +func (r *Relay) Receive(ctx context.Context) (Envelope, error) { + r.mu.Lock() + conn := r.conn + r.mu.Unlock() + + if conn == nil { + return nil, fmt.Errorf("connection closed") + } + + _, data, err := conn.Read(ctx) + if err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + + return ParseEnvelope(data) +} + +// Publish sends an event to the relay and waits for the OK response. +func (r *Relay) Publish(ctx context.Context, event *Event) error { + env := EventEnvelope{Event: event} + if err := r.Send(ctx, env); err != nil { + return fmt.Errorf("failed to send event: %w", err) + } + + // Wait for OK response + for { + resp, err := r.Receive(ctx) + if err != nil { + return fmt.Errorf("failed to receive response: %w", err) + } + + if ok, isOK := resp.(*OKEnvelope); isOK { + if ok.EventID == event.ID { + if !ok.OK { + return fmt.Errorf("event rejected: %s", ok.Message) + } + return nil + } + } + + // Dispatch other messages to subscriptions + r.dispatchEnvelope(resp) + } +} + +// Subscribe creates a subscription with the given filters. +func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) { + sub := &Subscription{ + ID: id, + relay: r, + Filters: filters, + Events: make(chan *Event, 100), + EOSE: make(chan struct{}, 1), + closed: make(chan struct{}), + } + + r.subscriptionsMu.Lock() + r.subscriptions[id] = sub + r.subscriptionsMu.Unlock() + + env := ReqEnvelope{ + SubscriptionID: id, + Filters: filters, + } + if err := r.Send(ctx, env); err != nil { + r.subscriptionsMu.Lock() + delete(r.subscriptions, id) + r.subscriptionsMu.Unlock() + return nil, fmt.Errorf("failed to send subscription request: %w", err) + } + + return sub, nil +} + +// dispatchEnvelope routes incoming messages to the appropriate subscription. +func (r *Relay) dispatchEnvelope(env Envelope) { + switch e := env.(type) { + case *EventEnvelope: + r.subscriptionsMu.RLock() + sub, ok := r.subscriptions[e.SubscriptionID] + r.subscriptionsMu.RUnlock() + if ok { + select { + case sub.Events <- e.Event: + default: + // Channel full, drop event + } + } + case *EOSEEnvelope: + r.subscriptionsMu.RLock() + sub, ok := r.subscriptions[e.SubscriptionID] + r.subscriptionsMu.RUnlock() + if ok { + select { + case sub.EOSE <- struct{}{}: + default: + } + } + case *ClosedEnvelope: + r.subscriptionsMu.Lock() + if sub, ok := r.subscriptions[e.SubscriptionID]; ok { + close(sub.closed) + delete(r.subscriptions, e.SubscriptionID) + } + r.subscriptionsMu.Unlock() + } +} + +// Listen reads messages from the relay and dispatches them to subscriptions. +// This should be called in a goroutine when using multiple subscriptions. +func (r *Relay) Listen(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + env, err := r.Receive(ctx) + if err != nil { + return err + } + + r.dispatchEnvelope(env) + } +} + +// Subscription represents an active subscription to a relay. +type Subscription struct { + ID string + relay *Relay + Filters []Filter + Events chan *Event + EOSE chan struct{} + closed chan struct{} +} + +// Close unsubscribes from the relay. +func (s *Subscription) Close(ctx context.Context) error { + s.relay.subscriptionsMu.Lock() + delete(s.relay.subscriptions, s.ID) + s.relay.subscriptionsMu.Unlock() + + env := CloseEnvelope{SubscriptionID: s.ID} + return s.relay.Send(ctx, env) +} + +// Closed returns a channel that's closed when the subscription is terminated. +func (s *Subscription) Closed() <-chan struct{} { + return s.closed +} -- cgit v1.2.3