From 40df56985402a31695a9a3bb13319bd2a3276305 Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 14 Feb 2026 12:49:08 -0800 Subject: feat: track WebSocket subscriptions in metrics Add metrics tracking for WebSocket (NIP-01) subscriptions in addition to existing gRPC subscription tracking. Changes: - Add Count() method to subscription.Manager - Add MetricsRecorder interface to WebSocket handler - Update subscription metrics when REQ/CLOSE messages processed - Wire up metrics to WebSocket handler in main.go Before: Only gRPC stream subscriptions were counted After: Both gRPC and WebSocket subscriptions tracked accurately This fixes the dashboard showing 0 subscriptions when clients connect via WebSocket (e.g., nak req --stream). --- internal/handler/websocket/handler.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) (limited to 'internal/handler') diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go index 5d40abb..b7ea71d 100644 --- a/internal/handler/websocket/handler.go +++ b/internal/handler/websocket/handler.go @@ -20,9 +20,16 @@ type EventStore interface { ProcessDeletion(context.Context, *pb.Event) error } +type MetricsRecorder interface { + IncrementSubscriptions() + DecrementSubscriptions() + SetActiveSubscriptions(count int) +} + type Handler struct { store EventStore subs *subscription.Manager + metrics MetricsRecorder indexData IndexData } @@ -33,6 +40,10 @@ func NewHandler(store EventStore, subs *subscription.Manager) *Handler { } } +func (h *Handler) SetMetrics(m MetricsRecorder) { + h.metrics = m +} + // SetIndexData sets the addresses for the index page func (h *Handler) SetIndexData(grpcAddr, httpAddr, wsAddr string) { h.indexData = IndexData{ @@ -69,9 +80,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() clientSubs := make(map[string]*subscription.Subscription) defer func() { + count := len(clientSubs) for subID := range clientSubs { h.subs.Remove(subID) } + if h.metrics != nil && count > 0 { + for i := 0; i < count; i++ { + h.metrics.DecrementSubscriptions() + } + } }() for { @@ -192,6 +209,9 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso if existing, ok := clientSubs[subID]; ok { h.subs.Remove(existing.ID) delete(clientSubs, subID) + if h.metrics != nil { + h.metrics.DecrementSubscriptions() + } } storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) @@ -216,6 +236,10 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso h.subs.Add(sub) clientSubs[subID] = sub + if h.metrics != nil { + h.metrics.IncrementSubscriptions() + } + go h.streamEvents(ctx, conn, sub) return nil @@ -234,6 +258,9 @@ func (h *Handler) handleClose(raw []json.RawMessage, clientSubs map[string]*subs if sub, ok := clientSubs[subID]; ok { h.subs.Remove(sub.ID) delete(clientSubs, subID) + if h.metrics != nil { + h.metrics.DecrementSubscriptions() + } } return nil -- cgit v1.2.3