From 3481c3273f8764bd0a0ab51183dc57f592fb616c Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 18:17:37 -0800 Subject: feat: add WebSocket server with full NIP-01 support WebSocket handler: - NIP-01 protocol (EVENT, REQ, CLOSE, OK, EOSE, NOTICE) - JSON envelope parsing - Shares subscription manager with gRPC (unified event fan-out) - Standard Nostr client compatibility Relay now serves dual protocols: - gRPC on :50051 (binary, high performance) - WebSocket on :8080 (JSON, Nostr standard) Both protocols share: - Same storage layer - Same subscription manager - Same validation logic Compatible with all Nostr clients! --- internal/handler/websocket/handler.go | 246 ++++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 internal/handler/websocket/handler.go (limited to 'internal/handler/websocket/handler.go') diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go new file mode 100644 index 0000000..cef83dd --- /dev/null +++ b/internal/handler/websocket/handler.go @@ -0,0 +1,246 @@ +package websocket + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + + pb "northwest.io/nostr-grpc/api/nostr/v1" + "northwest.io/nostr-grpc/internal/nostr" + "northwest.io/nostr-grpc/internal/storage" + "northwest.io/nostr-grpc/internal/subscription" + "northwest.io/nostr-grpc/internal/websocket" +) + +type EventStore interface { + StoreEvent(context.Context, *storage.EventData) error + QueryEvents(context.Context, []*pb.Filter, *storage.QueryOptions) ([]*pb.Event, error) +} + +type Handler struct { + store EventStore + subs *subscription.Manager +} + +func NewHandler(store EventStore, subs *subscription.Manager) *Handler { + return &Handler{ + store: store, + subs: subs, + } +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r) + if err != nil { + log.Printf("WebSocket accept failed: %v", err) + return + } + defer conn.Close(websocket.StatusNormalClosure, "") + + ctx := r.Context() + clientSubs := make(map[string]*subscription.Subscription) + defer func() { + for subID := range clientSubs { + h.subs.Remove(subID) + } + }() + + for { + _, data, err := conn.Read(ctx) + if err != nil { + return + } + + if err := h.handleMessage(ctx, conn, data, clientSubs); err != nil { + log.Printf("Message handling error: %v", err) + h.sendNotice(ctx, conn, err.Error()) + } + } +} + +func (h *Handler) handleMessage(ctx context.Context, conn *websocket.Conn, data []byte, clientSubs map[string]*subscription.Subscription) error { + var raw []json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return fmt.Errorf("invalid JSON") + } + + if len(raw) == 0 { + return fmt.Errorf("empty message") + } + + var msgType string + if err := json.Unmarshal(raw[0], &msgType); err != nil { + return fmt.Errorf("invalid message type") + } + + switch msgType { + case "EVENT": + return h.handleEvent(ctx, conn, raw) + case "REQ": + return h.handleReq(ctx, conn, raw, clientSubs) + case "CLOSE": + return h.handleClose(raw, clientSubs) + default: + return fmt.Errorf("unknown message type: %s", msgType) + } +} + +func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage) error { + if len(raw) != 2 { + return fmt.Errorf("EVENT expects 2 elements") + } + + var event nostr.Event + if err := json.Unmarshal(raw[1], &event); err != nil { + return fmt.Errorf("invalid event: %w", err) + } + + if !event.CheckID() { + h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch") + return nil + } + + if !event.Verify() { + h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed") + return nil + } + + pbEvent := NostrToPB(&event) + canonicalJSON := event.Serialize() + + eventData := &storage.EventData{ + Event: pbEvent, + CanonicalJSON: canonicalJSON, + } + + err := h.store.StoreEvent(ctx, eventData) + if err == storage.ErrEventExists { + h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event") + return nil + } + if err != nil { + h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err)) + return nil + } + + h.subs.MatchAndFan(pbEvent) + + h.sendOK(ctx, conn, event.ID, true, "") + return nil +} + +func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription) error { + if len(raw) < 3 { + return fmt.Errorf("REQ expects at least 3 elements") + } + + var subID string + if err := json.Unmarshal(raw[1], &subID); err != nil { + return fmt.Errorf("invalid subscription ID") + } + + var filters []*pb.Filter + for i := 2; i < len(raw); i++ { + var nostrFilter nostr.Filter + if err := json.Unmarshal(raw[i], &nostrFilter); err != nil { + return fmt.Errorf("invalid filter: %w", err) + } + + pbFilter := NostrFilterToPB(&nostrFilter) + filters = append(filters, pbFilter) + } + + if existing, ok := clientSubs[subID]; ok { + h.subs.Remove(existing.ID) + delete(clientSubs, subID) + } + + storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) + if err != nil { + return fmt.Errorf("query failed: %w", err) + } + + for _, pbEvent := range storedEvents { + event := PBToNostr(pbEvent) + h.sendEvent(ctx, conn, subID, event) + } + + h.sendEOSE(ctx, conn, subID) + + sub := &subscription.Subscription{ + ID: subID, + Filters: filters, + Events: make(chan *pb.Event, 100), + } + sub.InitDone() + + h.subs.Add(sub) + clientSubs[subID] = sub + + go h.streamEvents(ctx, conn, sub) + + return nil +} + +func (h *Handler) handleClose(raw []json.RawMessage, clientSubs map[string]*subscription.Subscription) error { + if len(raw) != 2 { + return fmt.Errorf("CLOSE expects 2 elements") + } + + var subID string + if err := json.Unmarshal(raw[1], &subID); err != nil { + return fmt.Errorf("invalid subscription ID") + } + + if sub, ok := clientSubs[subID]; ok { + h.subs.Remove(sub.ID) + delete(clientSubs, subID) + } + + return nil +} + +func (h *Handler) streamEvents(ctx context.Context, conn *websocket.Conn, sub *subscription.Subscription) { + for { + select { + case pbEvent, ok := <-sub.Events: + if !ok { + return + } + event := PBToNostr(pbEvent) + h.sendEvent(ctx, conn, sub.ID, event) + + case <-ctx.Done(): + return + + case <-sub.Done(): + return + } + } +} + +func (h *Handler) sendEvent(ctx context.Context, conn *websocket.Conn, subID string, event *nostr.Event) error { + msg := []interface{}{"EVENT", subID, event} + data, _ := json.Marshal(msg) + return conn.Write(ctx, websocket.MessageText, data) +} + +func (h *Handler) sendOK(ctx context.Context, conn *websocket.Conn, eventID string, accepted bool, message string) error { + msg := []interface{}{"OK", eventID, accepted, message} + data, _ := json.Marshal(msg) + return conn.Write(ctx, websocket.MessageText, data) +} + +func (h *Handler) sendEOSE(ctx context.Context, conn *websocket.Conn, subID string) error { + msg := []interface{}{"EOSE", subID} + data, _ := json.Marshal(msg) + return conn.Write(ctx, websocket.MessageText, data) +} + +func (h *Handler) sendNotice(ctx context.Context, conn *websocket.Conn, notice string) error { + msg := []interface{}{"NOTICE", notice} + data, _ := json.Marshal(msg) + return conn.Write(ctx, websocket.MessageText, data) +} -- cgit v1.2.3