diff options
| author | bndw <ben@bdw.to> | 2026-02-14 12:49:08 -0800 |
|---|---|---|
| committer | bndw <ben@bdw.to> | 2026-02-14 12:49:08 -0800 |
| commit | 40df56985402a31695a9a3bb13319bd2a3276305 (patch) | |
| tree | f95842afe4e75b93dfea49b94e5a6815c0d936ce /internal/handler | |
| parent | 9711b2b177959c0ea8e5119bead16dfd40b12c47 (diff) | |
feat: track WebSocket subscriptions in metrics
Add metrics tracking for WebSocket (NIP-01) subscriptions in addition
to existing gRPC subscription tracking.
Changes:
- Add Count() method to subscription.Manager
- Add MetricsRecorder interface to WebSocket handler
- Update subscription metrics when REQ/CLOSE messages processed
- Wire up metrics to WebSocket handler in main.go
Before: Only gRPC stream subscriptions were counted
After: Both gRPC and WebSocket subscriptions tracked accurately
This fixes the dashboard showing 0 subscriptions when clients connect
via WebSocket (e.g., nak req --stream).
Diffstat (limited to 'internal/handler')
| -rw-r--r-- | internal/handler/websocket/handler.go | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go index 5d40abb..b7ea71d 100644 --- a/internal/handler/websocket/handler.go +++ b/internal/handler/websocket/handler.go | |||
| @@ -20,9 +20,16 @@ type EventStore interface { | |||
| 20 | ProcessDeletion(context.Context, *pb.Event) error | 20 | ProcessDeletion(context.Context, *pb.Event) error |
| 21 | } | 21 | } |
| 22 | 22 | ||
| 23 | type MetricsRecorder interface { | ||
| 24 | IncrementSubscriptions() | ||
| 25 | DecrementSubscriptions() | ||
| 26 | SetActiveSubscriptions(count int) | ||
| 27 | } | ||
| 28 | |||
| 23 | type Handler struct { | 29 | type Handler struct { |
| 24 | store EventStore | 30 | store EventStore |
| 25 | subs *subscription.Manager | 31 | subs *subscription.Manager |
| 32 | metrics MetricsRecorder | ||
| 26 | indexData IndexData | 33 | indexData IndexData |
| 27 | } | 34 | } |
| 28 | 35 | ||
| @@ -33,6 +40,10 @@ func NewHandler(store EventStore, subs *subscription.Manager) *Handler { | |||
| 33 | } | 40 | } |
| 34 | } | 41 | } |
| 35 | 42 | ||
| 43 | func (h *Handler) SetMetrics(m MetricsRecorder) { | ||
| 44 | h.metrics = m | ||
| 45 | } | ||
| 46 | |||
| 36 | // SetIndexData sets the addresses for the index page | 47 | // SetIndexData sets the addresses for the index page |
| 37 | func (h *Handler) SetIndexData(grpcAddr, httpAddr, wsAddr string) { | 48 | func (h *Handler) SetIndexData(grpcAddr, httpAddr, wsAddr string) { |
| 38 | h.indexData = IndexData{ | 49 | h.indexData = IndexData{ |
| @@ -69,9 +80,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |||
| 69 | ctx := r.Context() | 80 | ctx := r.Context() |
| 70 | clientSubs := make(map[string]*subscription.Subscription) | 81 | clientSubs := make(map[string]*subscription.Subscription) |
| 71 | defer func() { | 82 | defer func() { |
| 83 | count := len(clientSubs) | ||
| 72 | for subID := range clientSubs { | 84 | for subID := range clientSubs { |
| 73 | h.subs.Remove(subID) | 85 | h.subs.Remove(subID) |
| 74 | } | 86 | } |
| 87 | if h.metrics != nil && count > 0 { | ||
| 88 | for i := 0; i < count; i++ { | ||
| 89 | h.metrics.DecrementSubscriptions() | ||
| 90 | } | ||
| 91 | } | ||
| 75 | }() | 92 | }() |
| 76 | 93 | ||
| 77 | for { | 94 | for { |
| @@ -192,6 +209,9 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso | |||
| 192 | if existing, ok := clientSubs[subID]; ok { | 209 | if existing, ok := clientSubs[subID]; ok { |
| 193 | h.subs.Remove(existing.ID) | 210 | h.subs.Remove(existing.ID) |
| 194 | delete(clientSubs, subID) | 211 | delete(clientSubs, subID) |
| 212 | if h.metrics != nil { | ||
| 213 | h.metrics.DecrementSubscriptions() | ||
| 214 | } | ||
| 195 | } | 215 | } |
| 196 | 216 | ||
| 197 | storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) | 217 | storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) |
| @@ -216,6 +236,10 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso | |||
| 216 | h.subs.Add(sub) | 236 | h.subs.Add(sub) |
| 217 | clientSubs[subID] = sub | 237 | clientSubs[subID] = sub |
| 218 | 238 | ||
| 239 | if h.metrics != nil { | ||
| 240 | h.metrics.IncrementSubscriptions() | ||
| 241 | } | ||
| 242 | |||
| 219 | go h.streamEvents(ctx, conn, sub) | 243 | go h.streamEvents(ctx, conn, sub) |
| 220 | 244 | ||
| 221 | return nil | 245 | return nil |
| @@ -234,6 +258,9 @@ func (h *Handler) handleClose(raw []json.RawMessage, clientSubs map[string]*subs | |||
| 234 | if sub, ok := clientSubs[subID]; ok { | 258 | if sub, ok := clientSubs[subID]; ok { |
| 235 | h.subs.Remove(sub.ID) | 259 | h.subs.Remove(sub.ID) |
| 236 | delete(clientSubs, subID) | 260 | delete(clientSubs, subID) |
| 261 | if h.metrics != nil { | ||
| 262 | h.metrics.DecrementSubscriptions() | ||
| 263 | } | ||
| 237 | } | 264 | } |
| 238 | 265 | ||
| 239 | return nil | 266 | return nil |
