diff options
| author | bndw <ben@bdw.to> | 2026-02-14 22:11:08 -0800 |
|---|---|---|
| committer | bndw <ben@bdw.to> | 2026-02-14 22:11:08 -0800 |
| commit | bff984727e240dfc29533381d60a2127d833c10a (patch) | |
| tree | 369c75e67682a126e58fa27b4e4cdcedba2bf188 /internal/handler | |
| parent | a98e6b9f844b980d27097be68886ec9faa6d90e8 (diff) | |
feat: track WebSocket requests in metrics
Add request tracking for EVENT and REQ messages to match gRPC behavior.
Dashboard now shows total/success/error counts for all requests.
- Add RecordRequest to MetricsRecorder interface
- Track timing and status in handleEvent and handleReq
- Record metrics with status: ok, error, unauthenticated, rate_limited
- Measure request duration for performance monitoring
WebSocket is the primary interface, so tracking these requests is critical
for understanding relay usage and performance.
Diffstat (limited to 'internal/handler')
| -rw-r--r-- | internal/handler/websocket/handler.go | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go index cb089b1..715e777 100644 --- a/internal/handler/websocket/handler.go +++ b/internal/handler/websocket/handler.go | |||
| @@ -6,6 +6,7 @@ import ( | |||
| 6 | "fmt" | 6 | "fmt" |
| 7 | "log" | 7 | "log" |
| 8 | "net/http" | 8 | "net/http" |
| 9 | "time" | ||
| 9 | 10 | ||
| 10 | pb "northwest.io/muxstr/api/nostr/v1" | 11 | pb "northwest.io/muxstr/api/nostr/v1" |
| 11 | "northwest.io/muxstr/internal/storage" | 12 | "northwest.io/muxstr/internal/storage" |
| @@ -31,6 +32,7 @@ type MetricsRecorder interface { | |||
| 31 | IncrementSubscriptions() | 32 | IncrementSubscriptions() |
| 32 | DecrementSubscriptions() | 33 | DecrementSubscriptions() |
| 33 | SetActiveSubscriptions(count int) | 34 | SetActiveSubscriptions(count int) |
| 35 | RecordRequest(method, status string, duration float64) | ||
| 34 | } | 36 | } |
| 35 | 37 | ||
| 36 | type RateLimiter interface { | 38 | type RateLimiter interface { |
| @@ -237,15 +239,30 @@ func (h *Handler) requireAuth(ctx context.Context, conn *websocket.Conn, isWrite | |||
| 237 | } | 239 | } |
| 238 | 240 | ||
| 239 | func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, state *connState) error { | 241 | func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, state *connState) error { |
| 242 | start := time.Now() | ||
| 243 | var status string | ||
| 244 | defer func() { | ||
| 245 | if h.metrics != nil { | ||
| 246 | duration := time.Since(start).Seconds() | ||
| 247 | if status == "" { | ||
| 248 | status = "ok" // Default to ok if not set | ||
| 249 | } | ||
| 250 | h.metrics.RecordRequest("EVENT", status, duration) | ||
| 251 | } | ||
| 252 | }() | ||
| 253 | |||
| 240 | if len(raw) != 2 { | 254 | if len(raw) != 2 { |
| 255 | status = "error" | ||
| 241 | return fmt.Errorf("EVENT expects 2 elements") | 256 | return fmt.Errorf("EVENT expects 2 elements") |
| 242 | } | 257 | } |
| 243 | 258 | ||
| 244 | if err := h.requireAuth(ctx, conn, true, state); err != nil { | 259 | if err := h.requireAuth(ctx, conn, true, state); err != nil { |
| 260 | status = "error" | ||
| 245 | return err | 261 | return err |
| 246 | } | 262 | } |
| 247 | 263 | ||
| 248 | if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.WriteEnabled { | 264 | if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.WriteEnabled { |
| 265 | status = "unauthenticated" | ||
| 249 | return nil | 266 | return nil |
| 250 | } | 267 | } |
| 251 | 268 | ||
| @@ -256,21 +273,25 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j | |||
| 256 | identifier = state.clientIP | 273 | identifier = state.clientIP |
| 257 | } | 274 | } |
| 258 | if !h.limiter.Allow(identifier, "EVENT") { | 275 | if !h.limiter.Allow(identifier, "EVENT") { |
| 276 | status = "rate_limited" | ||
| 259 | return fmt.Errorf("rate limit exceeded") | 277 | return fmt.Errorf("rate limit exceeded") |
| 260 | } | 278 | } |
| 261 | } | 279 | } |
| 262 | 280 | ||
| 263 | var event nostr.Event | 281 | var event nostr.Event |
| 264 | if err := json.Unmarshal(raw[1], &event); err != nil { | 282 | if err := json.Unmarshal(raw[1], &event); err != nil { |
| 283 | status = "error" | ||
| 265 | return fmt.Errorf("invalid event: %w", err) | 284 | return fmt.Errorf("invalid event: %w", err) |
| 266 | } | 285 | } |
| 267 | 286 | ||
| 268 | if !event.CheckID() { | 287 | if !event.CheckID() { |
| 288 | status = "error" | ||
| 269 | h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch") | 289 | h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch") |
| 270 | return nil | 290 | return nil |
| 271 | } | 291 | } |
| 272 | 292 | ||
| 273 | if !event.Verify() { | 293 | if !event.Verify() { |
| 294 | status = "error" | ||
| 274 | h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed") | 295 | h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed") |
| 275 | return nil | 296 | return nil |
| 276 | } | 297 | } |
| @@ -281,9 +302,11 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j | |||
| 281 | // Handle deletion events (kind 5) - process but don't store | 302 | // Handle deletion events (kind 5) - process but don't store |
| 282 | if pbEvent.Kind == 5 { | 303 | if pbEvent.Kind == 5 { |
| 283 | if err := h.store.ProcessDeletion(ctx, pbEvent); err != nil { | 304 | if err := h.store.ProcessDeletion(ctx, pbEvent); err != nil { |
| 305 | status = "error" | ||
| 284 | h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("deletion failed: %v", err)) | 306 | h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("deletion failed: %v", err)) |
| 285 | return nil | 307 | return nil |
| 286 | } | 308 | } |
| 309 | status = "ok" | ||
| 287 | h.sendOK(ctx, conn, event.ID, true, "deleted") | 310 | h.sendOK(ctx, conn, event.ID, true, "deleted") |
| 288 | return nil | 311 | return nil |
| 289 | } | 312 | } |
| @@ -295,30 +318,48 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j | |||
| 295 | 318 | ||
| 296 | err := h.store.StoreEvent(ctx, eventData) | 319 | err := h.store.StoreEvent(ctx, eventData) |
| 297 | if err == storage.ErrEventExists { | 320 | if err == storage.ErrEventExists { |
| 321 | status = "ok" | ||
| 298 | h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event") | 322 | h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event") |
| 299 | return nil | 323 | return nil |
| 300 | } | 324 | } |
| 301 | if err != nil { | 325 | if err != nil { |
| 326 | status = "error" | ||
| 302 | h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err)) | 327 | h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err)) |
| 303 | return nil | 328 | return nil |
| 304 | } | 329 | } |
| 305 | 330 | ||
| 306 | h.subs.MatchAndFan(pbEvent) | 331 | h.subs.MatchAndFan(pbEvent) |
| 307 | 332 | ||
| 333 | status = "ok" | ||
| 308 | h.sendOK(ctx, conn, event.ID, true, "") | 334 | h.sendOK(ctx, conn, event.ID, true, "") |
| 309 | return nil | 335 | return nil |
| 310 | } | 336 | } |
| 311 | 337 | ||
| 312 | func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription, state *connState) error { | 338 | func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription, state *connState) error { |
| 339 | start := time.Now() | ||
| 340 | var status string | ||
| 341 | defer func() { | ||
| 342 | if h.metrics != nil { | ||
| 343 | duration := time.Since(start).Seconds() | ||
| 344 | if status == "" { | ||
| 345 | status = "ok" // Default to ok if not set | ||
| 346 | } | ||
| 347 | h.metrics.RecordRequest("REQ", status, duration) | ||
| 348 | } | ||
| 349 | }() | ||
| 350 | |||
| 313 | if len(raw) < 3 { | 351 | if len(raw) < 3 { |
| 352 | status = "error" | ||
| 314 | return fmt.Errorf("REQ expects at least 3 elements") | 353 | return fmt.Errorf("REQ expects at least 3 elements") |
| 315 | } | 354 | } |
| 316 | 355 | ||
| 317 | if err := h.requireAuth(ctx, conn, false, state); err != nil { | 356 | if err := h.requireAuth(ctx, conn, false, state); err != nil { |
| 357 | status = "error" | ||
| 318 | return err | 358 | return err |
| 319 | } | 359 | } |
| 320 | 360 | ||
| 321 | if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.ReadEnabled { | 361 | if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.ReadEnabled { |
| 362 | status = "unauthenticated" | ||
| 322 | return nil | 363 | return nil |
| 323 | } | 364 | } |
| 324 | 365 | ||
| @@ -329,12 +370,14 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso | |||
| 329 | identifier = state.clientIP | 370 | identifier = state.clientIP |
| 330 | } | 371 | } |
| 331 | if !h.limiter.Allow(identifier, "REQ") { | 372 | if !h.limiter.Allow(identifier, "REQ") { |
| 373 | status = "rate_limited" | ||
| 332 | return fmt.Errorf("rate limit exceeded") | 374 | return fmt.Errorf("rate limit exceeded") |
| 333 | } | 375 | } |
| 334 | } | 376 | } |
| 335 | 377 | ||
| 336 | var subID string | 378 | var subID string |
| 337 | if err := json.Unmarshal(raw[1], &subID); err != nil { | 379 | if err := json.Unmarshal(raw[1], &subID); err != nil { |
| 380 | status = "error" | ||
| 338 | return fmt.Errorf("invalid subscription ID") | 381 | return fmt.Errorf("invalid subscription ID") |
| 339 | } | 382 | } |
| 340 | 383 | ||
| @@ -342,6 +385,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso | |||
| 342 | for i := 2; i < len(raw); i++ { | 385 | for i := 2; i < len(raw); i++ { |
| 343 | var nostrFilter nostr.Filter | 386 | var nostrFilter nostr.Filter |
| 344 | if err := json.Unmarshal(raw[i], &nostrFilter); err != nil { | 387 | if err := json.Unmarshal(raw[i], &nostrFilter); err != nil { |
| 388 | status = "error" | ||
| 345 | return fmt.Errorf("invalid filter: %w", err) | 389 | return fmt.Errorf("invalid filter: %w", err) |
| 346 | } | 390 | } |
| 347 | 391 | ||
| @@ -359,6 +403,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso | |||
| 359 | 403 | ||
| 360 | storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) | 404 | storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) |
| 361 | if err != nil { | 405 | if err != nil { |
| 406 | status = "error" | ||
| 362 | return fmt.Errorf("query failed: %w", err) | 407 | return fmt.Errorf("query failed: %w", err) |
| 363 | } | 408 | } |
| 364 | 409 | ||
| @@ -385,6 +430,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso | |||
| 385 | 430 | ||
| 386 | go h.streamEvents(ctx, conn, sub) | 431 | go h.streamEvents(ctx, conn, sub) |
| 387 | 432 | ||
| 433 | status = "ok" | ||
| 388 | return nil | 434 | return nil |
| 389 | } | 435 | } |
| 390 | 436 | ||
