From 40df56985402a31695a9a3bb13319bd2a3276305 Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 14 Feb 2026 12:49:08 -0800 Subject: feat: track WebSocket subscriptions in metrics Add metrics tracking for WebSocket (NIP-01) subscriptions in addition to existing gRPC subscription tracking. Changes: - Add Count() method to subscription.Manager - Add MetricsRecorder interface to WebSocket handler - Update subscription metrics when REQ/CLOSE messages processed - Wire up metrics to WebSocket handler in main.go Before: Only gRPC stream subscriptions were counted After: Both gRPC and WebSocket subscriptions tracked accurately This fixes the dashboard showing 0 subscriptions when clients connect via WebSocket (e.g., nak req --stream). --- cmd/relay/main.go | 3 +++ internal/handler/websocket/handler.go | 27 +++++++++++++++++++++++++++ internal/subscription/manager.go | 6 ++++++ 3 files changed, 36 insertions(+) diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 457a719..e4fe9bc 100644 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -93,6 +93,9 @@ func main() { } wsHandler := wshandler.NewHandler(store, subManager) + if m != nil { + wsHandler.SetMetrics(m) + } var grpcDisplay, httpDisplay, wsDisplay string if cfg.Server.PublicURL != "" { diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go index 5d40abb..b7ea71d 100644 --- a/internal/handler/websocket/handler.go +++ b/internal/handler/websocket/handler.go @@ -20,9 +20,16 @@ type EventStore interface { ProcessDeletion(context.Context, *pb.Event) error } +type MetricsRecorder interface { + IncrementSubscriptions() + DecrementSubscriptions() + SetActiveSubscriptions(count int) +} + type Handler struct { store EventStore subs *subscription.Manager + metrics MetricsRecorder indexData IndexData } @@ -33,6 +40,10 @@ func NewHandler(store EventStore, subs *subscription.Manager) *Handler { } } +func (h *Handler) SetMetrics(m MetricsRecorder) { + h.metrics = m +} + // SetIndexData sets the addresses for the index page func (h *Handler) SetIndexData(grpcAddr, httpAddr, wsAddr string) { h.indexData = IndexData{ @@ -69,9 +80,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() clientSubs := make(map[string]*subscription.Subscription) defer func() { + count := len(clientSubs) for subID := range clientSubs { h.subs.Remove(subID) } + if h.metrics != nil && count > 0 { + for i := 0; i < count; i++ { + h.metrics.DecrementSubscriptions() + } + } }() for { @@ -192,6 +209,9 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso if existing, ok := clientSubs[subID]; ok { h.subs.Remove(existing.ID) delete(clientSubs, subID) + if h.metrics != nil { + h.metrics.DecrementSubscriptions() + } } storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) @@ -216,6 +236,10 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso h.subs.Add(sub) clientSubs[subID] = sub + if h.metrics != nil { + h.metrics.IncrementSubscriptions() + } + go h.streamEvents(ctx, conn, sub) return nil @@ -234,6 +258,9 @@ func (h *Handler) handleClose(raw []json.RawMessage, clientSubs map[string]*subs if sub, ok := clientSubs[subID]; ok { h.subs.Remove(sub.ID) delete(clientSubs, subID) + if h.metrics != nil { + h.metrics.DecrementSubscriptions() + } } return nil diff --git a/internal/subscription/manager.go b/internal/subscription/manager.go index 6347018..3f6ac7f 100644 --- a/internal/subscription/manager.go +++ b/internal/subscription/manager.go @@ -72,6 +72,12 @@ func (m *Manager) Get(id string) (*Subscription, bool) { return sub, ok } +func (m *Manager) Count() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.subscriptions) +} + func (m *Manager) MatchAndFan(event *pb.Event) { m.mu.RLock() defer m.mu.RUnlock() -- cgit v1.2.3