From bff984727e240dfc29533381d60a2127d833c10a Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 14 Feb 2026 22:11:08 -0800 Subject: feat: track WebSocket requests in metrics Add request tracking for EVENT and REQ messages to match gRPC behavior. Dashboard now shows total/success/error counts for all requests. - Add RecordRequest to MetricsRecorder interface - Track timing and status in handleEvent and handleReq - Record metrics with status: ok, error, unauthenticated, rate_limited - Measure request duration for performance monitoring WebSocket is the primary interface, so tracking these requests is critical for understanding relay usage and performance. --- internal/handler/websocket/handler.go | 46 +++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) (limited to 'internal/handler') diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go index cb089b1..715e777 100644 --- a/internal/handler/websocket/handler.go +++ b/internal/handler/websocket/handler.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "net/http" + "time" pb "northwest.io/muxstr/api/nostr/v1" "northwest.io/muxstr/internal/storage" @@ -31,6 +32,7 @@ type MetricsRecorder interface { IncrementSubscriptions() DecrementSubscriptions() SetActiveSubscriptions(count int) + RecordRequest(method, status string, duration float64) } type RateLimiter interface { @@ -237,15 +239,30 @@ func (h *Handler) requireAuth(ctx context.Context, conn *websocket.Conn, isWrite } func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, state *connState) error { + start := time.Now() + var status string + defer func() { + if h.metrics != nil { + duration := time.Since(start).Seconds() + if status == "" { + status = "ok" // Default to ok if not set + } + h.metrics.RecordRequest("EVENT", status, duration) + } + }() + if len(raw) != 2 { + status = "error" return fmt.Errorf("EVENT expects 2 elements") } if err := h.requireAuth(ctx, conn, true, state); err != nil { + status = "error" return err } if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.WriteEnabled { + status = "unauthenticated" return nil } @@ -256,21 +273,25 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j identifier = state.clientIP } if !h.limiter.Allow(identifier, "EVENT") { + status = "rate_limited" return fmt.Errorf("rate limit exceeded") } } var event nostr.Event if err := json.Unmarshal(raw[1], &event); err != nil { + status = "error" return fmt.Errorf("invalid event: %w", err) } if !event.CheckID() { + status = "error" h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch") return nil } if !event.Verify() { + status = "error" h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed") return nil } @@ -281,9 +302,11 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j // Handle deletion events (kind 5) - process but don't store if pbEvent.Kind == 5 { if err := h.store.ProcessDeletion(ctx, pbEvent); err != nil { + status = "error" h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("deletion failed: %v", err)) return nil } + status = "ok" h.sendOK(ctx, conn, event.ID, true, "deleted") return nil } @@ -295,30 +318,48 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j err := h.store.StoreEvent(ctx, eventData) if err == storage.ErrEventExists { + status = "ok" h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event") return nil } if err != nil { + status = "error" h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err)) return nil } h.subs.MatchAndFan(pbEvent) + status = "ok" h.sendOK(ctx, conn, event.ID, true, "") return nil } func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription, state *connState) error { + start := time.Now() + var status string + defer func() { + if h.metrics != nil { + duration := time.Since(start).Seconds() + if status == "" { + status = "ok" // Default to ok if not set + } + h.metrics.RecordRequest("REQ", status, duration) + } + }() + if len(raw) < 3 { + status = "error" return fmt.Errorf("REQ expects at least 3 elements") } if err := h.requireAuth(ctx, conn, false, state); err != nil { + status = "error" return err } if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.ReadEnabled { + status = "unauthenticated" return nil } @@ -329,12 +370,14 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso identifier = state.clientIP } if !h.limiter.Allow(identifier, "REQ") { + status = "rate_limited" return fmt.Errorf("rate limit exceeded") } } var subID string if err := json.Unmarshal(raw[1], &subID); err != nil { + status = "error" return fmt.Errorf("invalid subscription ID") } @@ -342,6 +385,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso for i := 2; i < len(raw); i++ { var nostrFilter nostr.Filter if err := json.Unmarshal(raw[i], &nostrFilter); err != nil { + status = "error" return fmt.Errorf("invalid filter: %w", err) } @@ -359,6 +403,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) if err != nil { + status = "error" return fmt.Errorf("query failed: %w", err) } @@ -385,6 +430,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso go h.streamEvents(ctx, conn, sub) + status = "ok" return nil } -- cgit v1.2.3