From 4fc493e6d8cc20137f920f8647e39fc5051bb245 Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 14 Feb 2026 12:03:21 -0800 Subject: refactor: remove frivolous comments from auth validation/credentials Also removed internal/nostr package - now using northwest.io/nostr library. --- internal/nostr/relay.go | 305 ------------------------------------------------ 1 file changed, 305 deletions(-) delete mode 100644 internal/nostr/relay.go (limited to 'internal/nostr/relay.go') diff --git a/internal/nostr/relay.go b/internal/nostr/relay.go deleted file mode 100644 index 189fb93..0000000 --- a/internal/nostr/relay.go +++ /dev/null @@ -1,305 +0,0 @@ -package nostr - -import ( - "context" - "crypto/rand" - "fmt" - "sync" - - "northwest.io/muxstr/internal/websocket" -) - -// Relay represents a connection to a Nostr relay. -type Relay struct { - URL string - conn *websocket.Conn - mu sync.Mutex - - subscriptions map[string]*Subscription - subscriptionsMu sync.RWMutex - - okChannels map[string]chan *OKEnvelope - okChannelsMu sync.Mutex -} - -// Connect establishes a WebSocket connection to the relay. -func Connect(ctx context.Context, url string) (*Relay, error) { - conn, err := websocket.Dial(ctx, url) - if err != nil { - return nil, fmt.Errorf("failed to connect to relay: %w", err) - } - - r := &Relay{ - URL: url, - conn: conn, - subscriptions: make(map[string]*Subscription), - okChannels: make(map[string]chan *OKEnvelope), - } - - go r.Listen(ctx) - - return r, nil -} - -// Close closes the WebSocket connection. -func (r *Relay) Close() error { - r.mu.Lock() - defer r.mu.Unlock() - - if r.conn == nil { - return nil - } - - err := r.conn.Close(websocket.StatusNormalClosure, "") - r.conn = nil - return err -} - -// Send sends an envelope to the relay. -func (r *Relay) Send(ctx context.Context, env Envelope) error { - data, err := env.MarshalJSON() - if err != nil { - return fmt.Errorf("failed to marshal envelope: %w", err) - } - - r.mu.Lock() - defer r.mu.Unlock() - - if r.conn == nil { - return fmt.Errorf("connection closed") - } - - return r.conn.Write(ctx, websocket.MessageText, data) -} - -// Receive reads the next envelope from the relay. -func (r *Relay) Receive(ctx context.Context) (Envelope, error) { - r.mu.Lock() - conn := r.conn - r.mu.Unlock() - - if conn == nil { - return nil, fmt.Errorf("connection closed") - } - - _, data, err := conn.Read(ctx) - if err != nil { - return nil, fmt.Errorf("failed to read message: %w", err) - } - - return ParseEnvelope(data) -} - -// Publish sends an event to the relay and waits for the OK response. -func (r *Relay) Publish(ctx context.Context, event *Event) error { - ch := make(chan *OKEnvelope, 1) - - r.okChannelsMu.Lock() - r.okChannels[event.ID] = ch - r.okChannelsMu.Unlock() - - defer func() { - r.okChannelsMu.Lock() - delete(r.okChannels, event.ID) - r.okChannelsMu.Unlock() - }() - - env := EventEnvelope{Event: event} - if err := r.Send(ctx, env); err != nil { - return fmt.Errorf("failed to send event: %w", err) - } - - select { - case ok := <-ch: - if !ok.OK { - return fmt.Errorf("event rejected: %s", ok.Message) - } - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func genID() string { - buf := make([]byte, 5) - rand.Read(buf) - return fmt.Sprintf("%x", buf) -} - -// subscribe is the internal implementation for Subscribe and Fetch. -func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription { - id := genID() - - sub := &Subscription{ - ID: id, - relay: r, - Filters: filters, - Events: make(chan *Event, 100), - closeOnEOSE: closeOnEOSE, - } - - r.subscriptionsMu.Lock() - r.subscriptions[id] = sub - r.subscriptionsMu.Unlock() - - go func() { - <-ctx.Done() - sub.stop(ctx.Err()) - r.subscriptionsMu.Lock() - delete(r.subscriptions, id) - r.subscriptionsMu.Unlock() - }() - - env := ReqEnvelope{ - SubscriptionID: id, - Filters: filters, - } - if err := r.Send(ctx, env); err != nil { - r.subscriptionsMu.Lock() - delete(r.subscriptions, id) - r.subscriptionsMu.Unlock() - sub.stop(fmt.Errorf("failed to send subscription request: %w", err)) - } - - return sub -} - -// Subscribe creates a subscription with the given filters. -// Events are received on the Events channel until the context is cancelled. -// After EOSE (end of stored events), the subscription continues to receive -// real-time events per the Nostr protocol. -func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription { - return r.subscribe(ctx, false, filters...) -} - -// Fetch creates a subscription that closes automatically when EOSE is received. -// Use this for one-shot queries where you only want stored events. -func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription { - return r.subscribe(ctx, true, filters...) -} - -// dispatchEnvelope routes incoming messages to the appropriate subscription. -func (r *Relay) dispatchEnvelope(env Envelope) { - switch e := env.(type) { - case *EventEnvelope: - r.subscriptionsMu.RLock() - sub, ok := r.subscriptions[e.SubscriptionID] - r.subscriptionsMu.RUnlock() - if ok { - sub.send(e.Event) - } - case *EOSEEnvelope: - r.subscriptionsMu.RLock() - sub, ok := r.subscriptions[e.SubscriptionID] - r.subscriptionsMu.RUnlock() - if ok && sub.closeOnEOSE { - r.subscriptionsMu.Lock() - delete(r.subscriptions, e.SubscriptionID) - r.subscriptionsMu.Unlock() - sub.stop(nil) - } - case *ClosedEnvelope: - r.subscriptionsMu.Lock() - sub, ok := r.subscriptions[e.SubscriptionID] - if ok { - delete(r.subscriptions, e.SubscriptionID) - } - r.subscriptionsMu.Unlock() - if ok { - sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message)) - } - case *OKEnvelope: - r.okChannelsMu.Lock() - ch, ok := r.okChannels[e.EventID] - r.okChannelsMu.Unlock() - if ok { - select { - case ch <- e: - default: - } - } - } -} - -// Listen reads messages from the relay and dispatches them to subscriptions. -func (r *Relay) Listen(ctx context.Context) error { - defer func() { - r.subscriptionsMu.Lock() - subs := make([]*Subscription, 0, len(r.subscriptions)) - for id, sub := range r.subscriptions { - subs = append(subs, sub) - delete(r.subscriptions, id) - } - r.subscriptionsMu.Unlock() - - for _, sub := range subs { - sub.stop(fmt.Errorf("connection closed")) - } - }() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - env, err := r.Receive(ctx) - if err != nil { - return err - } - - r.dispatchEnvelope(env) - } -} - -// Subscription represents an active subscription to a relay. -type Subscription struct { - ID string - relay *Relay - Filters []Filter - Events chan *Event - Err error - - closeOnEOSE bool - mu sync.Mutex - done bool -} - -// send delivers an event to the subscription's Events channel. -func (s *Subscription) send(ev *Event) { - s.mu.Lock() - defer s.mu.Unlock() - if s.done { - return - } - select { - case s.Events <- ev: - default: - } -} - -// stop closes the subscription's Events channel and sets the error. -// It is idempotent — only the first call has any effect. -func (s *Subscription) stop(err error) { - s.mu.Lock() - defer s.mu.Unlock() - if s.done { - return - } - s.done = true - s.Err = err - close(s.Events) -} - -// Close unsubscribes from the relay. -func (s *Subscription) Close(ctx context.Context) error { - s.stop(nil) - - s.relay.subscriptionsMu.Lock() - delete(s.relay.subscriptions, s.ID) - s.relay.subscriptionsMu.Unlock() - - env := CloseEnvelope{SubscriptionID: s.ID} - return s.relay.Send(ctx, env) -} -- cgit v1.2.3