aboutsummaryrefslogtreecommitdiffstats
path: root/relay/subscription
diff options
context:
space:
mode:
Diffstat (limited to 'relay/subscription')
-rw-r--r--relay/subscription/manager.go317
1 files changed, 317 insertions, 0 deletions
diff --git a/relay/subscription/manager.go b/relay/subscription/manager.go
new file mode 100644
index 0000000..d8ba653
--- /dev/null
+++ b/relay/subscription/manager.go
@@ -0,0 +1,317 @@
1// Package subscription manages in-memory subscriptions and live event fanout.
2package subscription
3
4import (
5 "bytes"
6 "sync"
7 "time"
8
9 "axon"
10)
11
12// Subscription holds a single client subscription: an ID, a set of filters,
13// and a buffered channel for live event delivery.
14type Subscription struct {
15 ID string
16 Filters []axon.Filter
17 // Events carries raw msgpack envelope bytes for matched events.
18 Events chan []byte
19 done chan struct{}
20 once sync.Once
21}
22
23// newSubscription creates a subscription with a buffered event channel.
24func newSubscription(id string, filters []axon.Filter) *Subscription {
25 return &Subscription{
26 ID: id,
27 Filters: filters,
28 Events: make(chan []byte, 100),
29 done: make(chan struct{}),
30 }
31}
32
33// Close shuts down the subscription and drains the event channel.
34func (s *Subscription) Close() {
35 s.once.Do(func() {
36 close(s.done)
37 close(s.Events)
38 })
39}
40
41// IsClosed reports whether the subscription has been closed.
42func (s *Subscription) IsClosed() bool {
43 select {
44 case <-s.done:
45 return true
46 default:
47 return false
48 }
49}
50
51// Done returns a channel that is closed when the subscription is closed.
52func (s *Subscription) Done() <-chan struct{} {
53 return s.done
54}
55
56// Manager maintains all active subscriptions for a single connection and fans
57// out incoming events to matching subscribers.
58type Manager struct {
59 mu sync.RWMutex
60 subs map[string]*Subscription
61}
62
63// NewManager returns an empty Manager.
64func NewManager() *Manager {
65 return &Manager{subs: make(map[string]*Subscription)}
66}
67
68// Add registers a new subscription, replacing any existing subscription with
69// the same ID.
70func (m *Manager) Add(id string, filters []axon.Filter) *Subscription {
71 m.mu.Lock()
72 defer m.mu.Unlock()
73
74 // Remove old subscription with same ID if present.
75 if old, ok := m.subs[id]; ok {
76 old.Close()
77 }
78
79 sub := newSubscription(id, filters)
80 m.subs[id] = sub
81 return sub
82}
83
84// Remove cancels and deletes a subscription by ID.
85func (m *Manager) Remove(id string) {
86 m.mu.Lock()
87 defer m.mu.Unlock()
88
89 if sub, ok := m.subs[id]; ok {
90 sub.Close()
91 delete(m.subs, id)
92 }
93}
94
95// CloseAll cancels every subscription held by this manager.
96func (m *Manager) CloseAll() {
97 m.mu.Lock()
98 defer m.mu.Unlock()
99
100 for id, sub := range m.subs {
101 sub.Close()
102 delete(m.subs, id)
103 }
104}
105
106// Fanout delivers envelopeBytes to all subscriptions whose filters match event.
107// The send is non-blocking: if the channel is full the event is dropped for
108// that subscriber.
109func (m *Manager) Fanout(event *axon.Event, envelopeBytes []byte) {
110 m.mu.RLock()
111 defer m.mu.RUnlock()
112
113 for _, sub := range m.subs {
114 if sub.IsClosed() {
115 continue
116 }
117 if MatchesAnyFilter(event, sub.Filters) {
118 select {
119 case sub.Events <- envelopeBytes:
120 case <-sub.done:
121 default:
122 // channel full — drop
123 }
124 }
125 }
126}
127
128// MatchesAnyFilter returns true if event matches at least one filter in filters.
129func MatchesAnyFilter(event *axon.Event, filters []axon.Filter) bool {
130 for i := range filters {
131 if MatchesFilter(event, &filters[i]) {
132 return true
133 }
134 }
135 return false
136}
137
138// MatchesFilter returns true if event satisfies all constraints in f.
139func MatchesFilter(event *axon.Event, f *axon.Filter) bool {
140 if len(f.IDs) > 0 {
141 if !matchesBytesPrefixes(event.ID, f.IDs) {
142 return false
143 }
144 }
145
146 if len(f.Authors) > 0 {
147 if !matchesBytesPrefixes(event.PubKey, f.Authors) {
148 return false
149 }
150 }
151
152 if len(f.Kinds) > 0 {
153 found := false
154 for _, k := range f.Kinds {
155 if event.Kind == k {
156 found = true
157 break
158 }
159 }
160 if !found {
161 return false
162 }
163 }
164
165 if f.Since != 0 && event.CreatedAt < f.Since {
166 return false
167 }
168
169 if f.Until != 0 && event.CreatedAt > f.Until {
170 return false
171 }
172
173 for _, tf := range f.Tags {
174 if !eventHasTagMatch(event, tf.Name, tf.Values) {
175 return false
176 }
177 }
178
179 return true
180}
181
182// matchesBytesPrefixes returns true if value has any of the given byte slices
183// as a prefix. A prefix of exactly 32 bytes must match exactly.
184func matchesBytesPrefixes(value []byte, prefixes [][]byte) bool {
185 for _, prefix := range prefixes {
186 if len(prefix) == 0 {
187 return true
188 }
189 if len(prefix) >= len(value) {
190 if bytes.Equal(value, prefix) {
191 return true
192 }
193 } else {
194 if bytes.HasPrefix(value, prefix) {
195 return true
196 }
197 }
198 }
199 return false
200}
201
202// eventHasTagMatch returns true if event has a tag named name whose first
203// value matches any of values.
204func eventHasTagMatch(event *axon.Event, name string, values []string) bool {
205 for _, tag := range event.Tags {
206 if tag.Name != name {
207 continue
208 }
209 if len(values) == 0 {
210 return true
211 }
212 if len(tag.Values) == 0 {
213 continue
214 }
215 for _, v := range values {
216 if tag.Values[0] == v {
217 return true
218 }
219 }
220 }
221 return false
222}
223
224// GlobalManager is a relay-wide manager that holds subscriptions from all
225// connections and supports cross-connection fanout.
226type GlobalManager struct {
227 mu sync.RWMutex
228 subs map[string]*Subscription // key: "connID:subID"
229}
230
231// NewGlobalManager returns an empty GlobalManager.
232func NewGlobalManager() *GlobalManager {
233 return &GlobalManager{subs: make(map[string]*Subscription)}
234}
235
236// Register adds a subscription under a globally unique key.
237func (g *GlobalManager) Register(connID, subID string, sub *Subscription) {
238 key := connID + ":" + subID
239 g.mu.Lock()
240 defer g.mu.Unlock()
241 if old, ok := g.subs[key]; ok {
242 old.Close()
243 }
244 g.subs[key] = sub
245}
246
247// Unregister removes a subscription.
248func (g *GlobalManager) Unregister(connID, subID string) {
249 key := connID + ":" + subID
250 g.mu.Lock()
251 defer g.mu.Unlock()
252 if sub, ok := g.subs[key]; ok {
253 sub.Close()
254 delete(g.subs, key)
255 }
256}
257
258// UnregisterConn removes all subscriptions for a connection.
259func (g *GlobalManager) UnregisterConn(connID string) {
260 prefix := connID + ":"
261 g.mu.Lock()
262 defer g.mu.Unlock()
263 for key, sub := range g.subs {
264 if len(key) > len(prefix) && key[:len(prefix)] == prefix {
265 sub.Close()
266 delete(g.subs, key)
267 }
268 }
269}
270
271// Fanout delivers the event to all matching subscriptions across all connections.
272func (g *GlobalManager) Fanout(event *axon.Event, envelopeBytes []byte) {
273 g.mu.RLock()
274 defer g.mu.RUnlock()
275
276 for _, sub := range g.subs {
277 if sub.IsClosed() {
278 continue
279 }
280 if MatchesAnyFilter(event, sub.Filters) {
281 select {
282 case sub.Events <- envelopeBytes:
283 case <-sub.done:
284 default:
285 }
286 }
287 }
288}
289
290// PurgeExpired removes closed subscriptions from the global map.
291// Call periodically to prevent unbounded growth.
292func (g *GlobalManager) PurgeExpired() {
293 g.mu.Lock()
294 defer g.mu.Unlock()
295 for key, sub := range g.subs {
296 if sub.IsClosed() {
297 delete(g.subs, key)
298 }
299 }
300}
301
302// StartPurger launches a background goroutine that periodically removes closed
303// subscriptions.
304func (g *GlobalManager) StartPurger(interval time.Duration, stop <-chan struct{}) {
305 go func() {
306 ticker := time.NewTicker(interval)
307 defer ticker.Stop()
308 for {
309 select {
310 case <-ticker.C:
311 g.PurgeExpired()
312 case <-stop:
313 return
314 }
315 }
316 }()
317}