summaryrefslogtreecommitdiffstats
path: root/internal
diff options
context:
space:
mode:
Diffstat (limited to 'internal')
-rw-r--r--internal/handler/websocket/handler.go46
1 files changed, 46 insertions, 0 deletions
diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go
index cb089b1..715e777 100644
--- a/internal/handler/websocket/handler.go
+++ b/internal/handler/websocket/handler.go
@@ -6,6 +6,7 @@ import (
6 "fmt" 6 "fmt"
7 "log" 7 "log"
8 "net/http" 8 "net/http"
9 "time"
9 10
10 pb "northwest.io/muxstr/api/nostr/v1" 11 pb "northwest.io/muxstr/api/nostr/v1"
11 "northwest.io/muxstr/internal/storage" 12 "northwest.io/muxstr/internal/storage"
@@ -31,6 +32,7 @@ type MetricsRecorder interface {
31 IncrementSubscriptions() 32 IncrementSubscriptions()
32 DecrementSubscriptions() 33 DecrementSubscriptions()
33 SetActiveSubscriptions(count int) 34 SetActiveSubscriptions(count int)
35 RecordRequest(method, status string, duration float64)
34} 36}
35 37
36type RateLimiter interface { 38type RateLimiter interface {
@@ -237,15 +239,30 @@ func (h *Handler) requireAuth(ctx context.Context, conn *websocket.Conn, isWrite
237} 239}
238 240
239func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, state *connState) error { 241func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, state *connState) error {
242 start := time.Now()
243 var status string
244 defer func() {
245 if h.metrics != nil {
246 duration := time.Since(start).Seconds()
247 if status == "" {
248 status = "ok" // Default to ok if not set
249 }
250 h.metrics.RecordRequest("EVENT", status, duration)
251 }
252 }()
253
240 if len(raw) != 2 { 254 if len(raw) != 2 {
255 status = "error"
241 return fmt.Errorf("EVENT expects 2 elements") 256 return fmt.Errorf("EVENT expects 2 elements")
242 } 257 }
243 258
244 if err := h.requireAuth(ctx, conn, true, state); err != nil { 259 if err := h.requireAuth(ctx, conn, true, state); err != nil {
260 status = "error"
245 return err 261 return err
246 } 262 }
247 263
248 if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.WriteEnabled { 264 if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.WriteEnabled {
265 status = "unauthenticated"
249 return nil 266 return nil
250 } 267 }
251 268
@@ -256,21 +273,25 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j
256 identifier = state.clientIP 273 identifier = state.clientIP
257 } 274 }
258 if !h.limiter.Allow(identifier, "EVENT") { 275 if !h.limiter.Allow(identifier, "EVENT") {
276 status = "rate_limited"
259 return fmt.Errorf("rate limit exceeded") 277 return fmt.Errorf("rate limit exceeded")
260 } 278 }
261 } 279 }
262 280
263 var event nostr.Event 281 var event nostr.Event
264 if err := json.Unmarshal(raw[1], &event); err != nil { 282 if err := json.Unmarshal(raw[1], &event); err != nil {
283 status = "error"
265 return fmt.Errorf("invalid event: %w", err) 284 return fmt.Errorf("invalid event: %w", err)
266 } 285 }
267 286
268 if !event.CheckID() { 287 if !event.CheckID() {
288 status = "error"
269 h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch") 289 h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch")
270 return nil 290 return nil
271 } 291 }
272 292
273 if !event.Verify() { 293 if !event.Verify() {
294 status = "error"
274 h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed") 295 h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed")
275 return nil 296 return nil
276 } 297 }
@@ -281,9 +302,11 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j
281 // Handle deletion events (kind 5) - process but don't store 302 // Handle deletion events (kind 5) - process but don't store
282 if pbEvent.Kind == 5 { 303 if pbEvent.Kind == 5 {
283 if err := h.store.ProcessDeletion(ctx, pbEvent); err != nil { 304 if err := h.store.ProcessDeletion(ctx, pbEvent); err != nil {
305 status = "error"
284 h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("deletion failed: %v", err)) 306 h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("deletion failed: %v", err))
285 return nil 307 return nil
286 } 308 }
309 status = "ok"
287 h.sendOK(ctx, conn, event.ID, true, "deleted") 310 h.sendOK(ctx, conn, event.ID, true, "deleted")
288 return nil 311 return nil
289 } 312 }
@@ -295,30 +318,48 @@ func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []j
295 318
296 err := h.store.StoreEvent(ctx, eventData) 319 err := h.store.StoreEvent(ctx, eventData)
297 if err == storage.ErrEventExists { 320 if err == storage.ErrEventExists {
321 status = "ok"
298 h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event") 322 h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event")
299 return nil 323 return nil
300 } 324 }
301 if err != nil { 325 if err != nil {
326 status = "error"
302 h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err)) 327 h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err))
303 return nil 328 return nil
304 } 329 }
305 330
306 h.subs.MatchAndFan(pbEvent) 331 h.subs.MatchAndFan(pbEvent)
307 332
333 status = "ok"
308 h.sendOK(ctx, conn, event.ID, true, "") 334 h.sendOK(ctx, conn, event.ID, true, "")
309 return nil 335 return nil
310} 336}
311 337
312func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription, state *connState) error { 338func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription, state *connState) error {
339 start := time.Now()
340 var status string
341 defer func() {
342 if h.metrics != nil {
343 duration := time.Since(start).Seconds()
344 if status == "" {
345 status = "ok" // Default to ok if not set
346 }
347 h.metrics.RecordRequest("REQ", status, duration)
348 }
349 }()
350
313 if len(raw) < 3 { 351 if len(raw) < 3 {
352 status = "error"
314 return fmt.Errorf("REQ expects at least 3 elements") 353 return fmt.Errorf("REQ expects at least 3 elements")
315 } 354 }
316 355
317 if err := h.requireAuth(ctx, conn, false, state); err != nil { 356 if err := h.requireAuth(ctx, conn, false, state); err != nil {
357 status = "error"
318 return err 358 return err
319 } 359 }
320 360
321 if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.ReadEnabled { 361 if state.authenticatedPubkey == "" && h.authConfig != nil && h.authConfig.ReadEnabled {
362 status = "unauthenticated"
322 return nil 363 return nil
323 } 364 }
324 365
@@ -329,12 +370,14 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso
329 identifier = state.clientIP 370 identifier = state.clientIP
330 } 371 }
331 if !h.limiter.Allow(identifier, "REQ") { 372 if !h.limiter.Allow(identifier, "REQ") {
373 status = "rate_limited"
332 return fmt.Errorf("rate limit exceeded") 374 return fmt.Errorf("rate limit exceeded")
333 } 375 }
334 } 376 }
335 377
336 var subID string 378 var subID string
337 if err := json.Unmarshal(raw[1], &subID); err != nil { 379 if err := json.Unmarshal(raw[1], &subID); err != nil {
380 status = "error"
338 return fmt.Errorf("invalid subscription ID") 381 return fmt.Errorf("invalid subscription ID")
339 } 382 }
340 383
@@ -342,6 +385,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso
342 for i := 2; i < len(raw); i++ { 385 for i := 2; i < len(raw); i++ {
343 var nostrFilter nostr.Filter 386 var nostrFilter nostr.Filter
344 if err := json.Unmarshal(raw[i], &nostrFilter); err != nil { 387 if err := json.Unmarshal(raw[i], &nostrFilter); err != nil {
388 status = "error"
345 return fmt.Errorf("invalid filter: %w", err) 389 return fmt.Errorf("invalid filter: %w", err)
346 } 390 }
347 391
@@ -359,6 +403,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso
359 403
360 storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) 404 storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0})
361 if err != nil { 405 if err != nil {
406 status = "error"
362 return fmt.Errorf("query failed: %w", err) 407 return fmt.Errorf("query failed: %w", err)
363 } 408 }
364 409
@@ -385,6 +430,7 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso
385 430
386 go h.streamEvents(ctx, conn, sub) 431 go h.streamEvents(ctx, conn, sub)
387 432
433 status = "ok"
388 return nil 434 return nil
389} 435}
390 436