From 581ceecbf046f99b39885c74e2780a5320e5b15e Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:35:32 -0800 Subject: feat: add Nostr protocol implementation (internal/nostr, internal/websocket) --- internal/nostr/relay.go | 305 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 internal/nostr/relay.go (limited to 'internal/nostr/relay.go') diff --git a/internal/nostr/relay.go b/internal/nostr/relay.go new file mode 100644 index 0000000..2b156e0 --- /dev/null +++ b/internal/nostr/relay.go @@ -0,0 +1,305 @@ +package nostr + +import ( + "context" + "crypto/rand" + "fmt" + "sync" + + "northwest.io/nostr-grpc/internal/websocket" +) + +// Relay represents a connection to a Nostr relay. +type Relay struct { + URL string + conn *websocket.Conn + mu sync.Mutex + + subscriptions map[string]*Subscription + subscriptionsMu sync.RWMutex + + okChannels map[string]chan *OKEnvelope + okChannelsMu sync.Mutex +} + +// Connect establishes a WebSocket connection to the relay. +func Connect(ctx context.Context, url string) (*Relay, error) { + conn, err := websocket.Dial(ctx, url) + if err != nil { + return nil, fmt.Errorf("failed to connect to relay: %w", err) + } + + r := &Relay{ + URL: url, + conn: conn, + subscriptions: make(map[string]*Subscription), + okChannels: make(map[string]chan *OKEnvelope), + } + + go r.Listen(ctx) + + return r, nil +} + +// Close closes the WebSocket connection. +func (r *Relay) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.conn == nil { + return nil + } + + err := r.conn.Close(websocket.StatusNormalClosure, "") + r.conn = nil + return err +} + +// Send sends an envelope to the relay. +func (r *Relay) Send(ctx context.Context, env Envelope) error { + data, err := env.MarshalJSON() + if err != nil { + return fmt.Errorf("failed to marshal envelope: %w", err) + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.conn == nil { + return fmt.Errorf("connection closed") + } + + return r.conn.Write(ctx, websocket.MessageText, data) +} + +// Receive reads the next envelope from the relay. +func (r *Relay) Receive(ctx context.Context) (Envelope, error) { + r.mu.Lock() + conn := r.conn + r.mu.Unlock() + + if conn == nil { + return nil, fmt.Errorf("connection closed") + } + + _, data, err := conn.Read(ctx) + if err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + + return ParseEnvelope(data) +} + +// Publish sends an event to the relay and waits for the OK response. +func (r *Relay) Publish(ctx context.Context, event *Event) error { + ch := make(chan *OKEnvelope, 1) + + r.okChannelsMu.Lock() + r.okChannels[event.ID] = ch + r.okChannelsMu.Unlock() + + defer func() { + r.okChannelsMu.Lock() + delete(r.okChannels, event.ID) + r.okChannelsMu.Unlock() + }() + + env := EventEnvelope{Event: event} + if err := r.Send(ctx, env); err != nil { + return fmt.Errorf("failed to send event: %w", err) + } + + select { + case ok := <-ch: + if !ok.OK { + return fmt.Errorf("event rejected: %s", ok.Message) + } + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func genID() string { + buf := make([]byte, 5) + rand.Read(buf) + return fmt.Sprintf("%x", buf) +} + +// subscribe is the internal implementation for Subscribe and Fetch. +func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription { + id := genID() + + sub := &Subscription{ + ID: id, + relay: r, + Filters: filters, + Events: make(chan *Event, 100), + closeOnEOSE: closeOnEOSE, + } + + r.subscriptionsMu.Lock() + r.subscriptions[id] = sub + r.subscriptionsMu.Unlock() + + go func() { + <-ctx.Done() + sub.stop(ctx.Err()) + r.subscriptionsMu.Lock() + delete(r.subscriptions, id) + r.subscriptionsMu.Unlock() + }() + + env := ReqEnvelope{ + SubscriptionID: id, + Filters: filters, + } + if err := r.Send(ctx, env); err != nil { + r.subscriptionsMu.Lock() + delete(r.subscriptions, id) + r.subscriptionsMu.Unlock() + sub.stop(fmt.Errorf("failed to send subscription request: %w", err)) + } + + return sub +} + +// Subscribe creates a subscription with the given filters. +// Events are received on the Events channel until the context is cancelled. +// After EOSE (end of stored events), the subscription continues to receive +// real-time events per the Nostr protocol. +func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription { + return r.subscribe(ctx, false, filters...) +} + +// Fetch creates a subscription that closes automatically when EOSE is received. +// Use this for one-shot queries where you only want stored events. +func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription { + return r.subscribe(ctx, true, filters...) +} + +// dispatchEnvelope routes incoming messages to the appropriate subscription. +func (r *Relay) dispatchEnvelope(env Envelope) { + switch e := env.(type) { + case *EventEnvelope: + r.subscriptionsMu.RLock() + sub, ok := r.subscriptions[e.SubscriptionID] + r.subscriptionsMu.RUnlock() + if ok { + sub.send(e.Event) + } + case *EOSEEnvelope: + r.subscriptionsMu.RLock() + sub, ok := r.subscriptions[e.SubscriptionID] + r.subscriptionsMu.RUnlock() + if ok && sub.closeOnEOSE { + r.subscriptionsMu.Lock() + delete(r.subscriptions, e.SubscriptionID) + r.subscriptionsMu.Unlock() + sub.stop(nil) + } + case *ClosedEnvelope: + r.subscriptionsMu.Lock() + sub, ok := r.subscriptions[e.SubscriptionID] + if ok { + delete(r.subscriptions, e.SubscriptionID) + } + r.subscriptionsMu.Unlock() + if ok { + sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message)) + } + case *OKEnvelope: + r.okChannelsMu.Lock() + ch, ok := r.okChannels[e.EventID] + r.okChannelsMu.Unlock() + if ok { + select { + case ch <- e: + default: + } + } + } +} + +// Listen reads messages from the relay and dispatches them to subscriptions. +func (r *Relay) Listen(ctx context.Context) error { + defer func() { + r.subscriptionsMu.Lock() + subs := make([]*Subscription, 0, len(r.subscriptions)) + for id, sub := range r.subscriptions { + subs = append(subs, sub) + delete(r.subscriptions, id) + } + r.subscriptionsMu.Unlock() + + for _, sub := range subs { + sub.stop(fmt.Errorf("connection closed")) + } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + env, err := r.Receive(ctx) + if err != nil { + return err + } + + r.dispatchEnvelope(env) + } +} + +// Subscription represents an active subscription to a relay. +type Subscription struct { + ID string + relay *Relay + Filters []Filter + Events chan *Event + Err error + + closeOnEOSE bool + mu sync.Mutex + done bool +} + +// send delivers an event to the subscription's Events channel. +func (s *Subscription) send(ev *Event) { + s.mu.Lock() + defer s.mu.Unlock() + if s.done { + return + } + select { + case s.Events <- ev: + default: + } +} + +// stop closes the subscription's Events channel and sets the error. +// It is idempotent — only the first call has any effect. +func (s *Subscription) stop(err error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.done { + return + } + s.done = true + s.Err = err + close(s.Events) +} + +// Close unsubscribes from the relay. +func (s *Subscription) Close(ctx context.Context) error { + s.stop(nil) + + s.relay.subscriptionsMu.Lock() + delete(s.relay.subscriptions, s.ID) + s.relay.subscriptionsMu.Unlock() + + env := CloseEnvelope{SubscriptionID: s.ID} + return s.relay.Send(ctx, env) +} -- cgit v1.2.3