summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/relay/main.go3
-rw-r--r--internal/handler/websocket/handler.go27
-rw-r--r--internal/subscription/manager.go6
3 files changed, 36 insertions, 0 deletions
diff --git a/cmd/relay/main.go b/cmd/relay/main.go
index 457a719..e4fe9bc 100644
--- a/cmd/relay/main.go
+++ b/cmd/relay/main.go
@@ -93,6 +93,9 @@ func main() {
93 } 93 }
94 94
95 wsHandler := wshandler.NewHandler(store, subManager) 95 wsHandler := wshandler.NewHandler(store, subManager)
96 if m != nil {
97 wsHandler.SetMetrics(m)
98 }
96 99
97 var grpcDisplay, httpDisplay, wsDisplay string 100 var grpcDisplay, httpDisplay, wsDisplay string
98 if cfg.Server.PublicURL != "" { 101 if cfg.Server.PublicURL != "" {
diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go
index 5d40abb..b7ea71d 100644
--- a/internal/handler/websocket/handler.go
+++ b/internal/handler/websocket/handler.go
@@ -20,9 +20,16 @@ type EventStore interface {
20 ProcessDeletion(context.Context, *pb.Event) error 20 ProcessDeletion(context.Context, *pb.Event) error
21} 21}
22 22
23type MetricsRecorder interface {
24 IncrementSubscriptions()
25 DecrementSubscriptions()
26 SetActiveSubscriptions(count int)
27}
28
23type Handler struct { 29type Handler struct {
24 store EventStore 30 store EventStore
25 subs *subscription.Manager 31 subs *subscription.Manager
32 metrics MetricsRecorder
26 indexData IndexData 33 indexData IndexData
27} 34}
28 35
@@ -33,6 +40,10 @@ func NewHandler(store EventStore, subs *subscription.Manager) *Handler {
33 } 40 }
34} 41}
35 42
43func (h *Handler) SetMetrics(m MetricsRecorder) {
44 h.metrics = m
45}
46
36// SetIndexData sets the addresses for the index page 47// SetIndexData sets the addresses for the index page
37func (h *Handler) SetIndexData(grpcAddr, httpAddr, wsAddr string) { 48func (h *Handler) SetIndexData(grpcAddr, httpAddr, wsAddr string) {
38 h.indexData = IndexData{ 49 h.indexData = IndexData{
@@ -69,9 +80,15 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
69 ctx := r.Context() 80 ctx := r.Context()
70 clientSubs := make(map[string]*subscription.Subscription) 81 clientSubs := make(map[string]*subscription.Subscription)
71 defer func() { 82 defer func() {
83 count := len(clientSubs)
72 for subID := range clientSubs { 84 for subID := range clientSubs {
73 h.subs.Remove(subID) 85 h.subs.Remove(subID)
74 } 86 }
87 if h.metrics != nil && count > 0 {
88 for i := 0; i < count; i++ {
89 h.metrics.DecrementSubscriptions()
90 }
91 }
75 }() 92 }()
76 93
77 for { 94 for {
@@ -192,6 +209,9 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso
192 if existing, ok := clientSubs[subID]; ok { 209 if existing, ok := clientSubs[subID]; ok {
193 h.subs.Remove(existing.ID) 210 h.subs.Remove(existing.ID)
194 delete(clientSubs, subID) 211 delete(clientSubs, subID)
212 if h.metrics != nil {
213 h.metrics.DecrementSubscriptions()
214 }
195 } 215 }
196 216
197 storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0}) 217 storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0})
@@ -216,6 +236,10 @@ func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []jso
216 h.subs.Add(sub) 236 h.subs.Add(sub)
217 clientSubs[subID] = sub 237 clientSubs[subID] = sub
218 238
239 if h.metrics != nil {
240 h.metrics.IncrementSubscriptions()
241 }
242
219 go h.streamEvents(ctx, conn, sub) 243 go h.streamEvents(ctx, conn, sub)
220 244
221 return nil 245 return nil
@@ -234,6 +258,9 @@ func (h *Handler) handleClose(raw []json.RawMessage, clientSubs map[string]*subs
234 if sub, ok := clientSubs[subID]; ok { 258 if sub, ok := clientSubs[subID]; ok {
235 h.subs.Remove(sub.ID) 259 h.subs.Remove(sub.ID)
236 delete(clientSubs, subID) 260 delete(clientSubs, subID)
261 if h.metrics != nil {
262 h.metrics.DecrementSubscriptions()
263 }
237 } 264 }
238 265
239 return nil 266 return nil
diff --git a/internal/subscription/manager.go b/internal/subscription/manager.go
index 6347018..3f6ac7f 100644
--- a/internal/subscription/manager.go
+++ b/internal/subscription/manager.go
@@ -72,6 +72,12 @@ func (m *Manager) Get(id string) (*Subscription, bool) {
72 return sub, ok 72 return sub, ok
73} 73}
74 74
75func (m *Manager) Count() int {
76 m.mu.RLock()
77 defer m.mu.RUnlock()
78 return len(m.subscriptions)
79}
80
75func (m *Manager) MatchAndFan(event *pb.Event) { 81func (m *Manager) MatchAndFan(event *pb.Event) {
76 m.mu.RLock() 82 m.mu.RLock()
77 defer m.mu.RUnlock() 83 defer m.mu.RUnlock()