diff options
| author | bndw <ben@bdw.to> | 2026-03-09 08:01:02 -0700 |
|---|---|---|
| committer | bndw <ben@bdw.to> | 2026-03-09 08:01:02 -0700 |
| commit | 61a85baf87d89fcc09f9469a113a2ddc982b0a24 (patch) | |
| tree | d8359ce5cbcbb9402ba92c617c4ebd702adf33e9 /relay/subscription/manager.go | |
| parent | ce684848e25fed3aabdde4ffba6d2d8c40afa030 (diff) | |
feat: phase 2 relay implementation
Implement the Axon relay as relay/ (module axon/relay). Includes:
- WebSocket framing (RFC 6455, no external deps) in relay/websocket/
- Per-connection auth: challenge/response with ed25519 + allowlist check
- Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence
- Subscription manager with prefix-matching filter fanout in relay/subscription/
- SQLite storage with WAL/cache config and UNION query builder in relay/storage/
- Graceful shutdown on SIGINT/SIGTERM
- Filter/TagFilter types added to axon core package (required by relay)
Diffstat (limited to 'relay/subscription/manager.go')
| -rw-r--r-- | relay/subscription/manager.go | 317 |
1 files changed, 317 insertions, 0 deletions
diff --git a/relay/subscription/manager.go b/relay/subscription/manager.go new file mode 100644 index 0000000..d8ba653 --- /dev/null +++ b/relay/subscription/manager.go | |||
| @@ -0,0 +1,317 @@ | |||
| 1 | // Package subscription manages in-memory subscriptions and live event fanout. | ||
| 2 | package subscription | ||
| 3 | |||
| 4 | import ( | ||
| 5 | "bytes" | ||
| 6 | "sync" | ||
| 7 | "time" | ||
| 8 | |||
| 9 | "axon" | ||
| 10 | ) | ||
| 11 | |||
| 12 | // Subscription holds a single client subscription: an ID, a set of filters, | ||
| 13 | // and a buffered channel for live event delivery. | ||
| 14 | type Subscription struct { | ||
| 15 | ID string | ||
| 16 | Filters []axon.Filter | ||
| 17 | // Events carries raw msgpack envelope bytes for matched events. | ||
| 18 | Events chan []byte | ||
| 19 | done chan struct{} | ||
| 20 | once sync.Once | ||
| 21 | } | ||
| 22 | |||
| 23 | // newSubscription creates a subscription with a buffered event channel. | ||
| 24 | func newSubscription(id string, filters []axon.Filter) *Subscription { | ||
| 25 | return &Subscription{ | ||
| 26 | ID: id, | ||
| 27 | Filters: filters, | ||
| 28 | Events: make(chan []byte, 100), | ||
| 29 | done: make(chan struct{}), | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | // Close shuts down the subscription and drains the event channel. | ||
| 34 | func (s *Subscription) Close() { | ||
| 35 | s.once.Do(func() { | ||
| 36 | close(s.done) | ||
| 37 | close(s.Events) | ||
| 38 | }) | ||
| 39 | } | ||
| 40 | |||
| 41 | // IsClosed reports whether the subscription has been closed. | ||
| 42 | func (s *Subscription) IsClosed() bool { | ||
| 43 | select { | ||
| 44 | case <-s.done: | ||
| 45 | return true | ||
| 46 | default: | ||
| 47 | return false | ||
| 48 | } | ||
| 49 | } | ||
| 50 | |||
| 51 | // Done returns a channel that is closed when the subscription is closed. | ||
| 52 | func (s *Subscription) Done() <-chan struct{} { | ||
| 53 | return s.done | ||
| 54 | } | ||
| 55 | |||
| 56 | // Manager maintains all active subscriptions for a single connection and fans | ||
| 57 | // out incoming events to matching subscribers. | ||
| 58 | type Manager struct { | ||
| 59 | mu sync.RWMutex | ||
| 60 | subs map[string]*Subscription | ||
| 61 | } | ||
| 62 | |||
| 63 | // NewManager returns an empty Manager. | ||
| 64 | func NewManager() *Manager { | ||
| 65 | return &Manager{subs: make(map[string]*Subscription)} | ||
| 66 | } | ||
| 67 | |||
| 68 | // Add registers a new subscription, replacing any existing subscription with | ||
| 69 | // the same ID. | ||
| 70 | func (m *Manager) Add(id string, filters []axon.Filter) *Subscription { | ||
| 71 | m.mu.Lock() | ||
| 72 | defer m.mu.Unlock() | ||
| 73 | |||
| 74 | // Remove old subscription with same ID if present. | ||
| 75 | if old, ok := m.subs[id]; ok { | ||
| 76 | old.Close() | ||
| 77 | } | ||
| 78 | |||
| 79 | sub := newSubscription(id, filters) | ||
| 80 | m.subs[id] = sub | ||
| 81 | return sub | ||
| 82 | } | ||
| 83 | |||
| 84 | // Remove cancels and deletes a subscription by ID. | ||
| 85 | func (m *Manager) Remove(id string) { | ||
| 86 | m.mu.Lock() | ||
| 87 | defer m.mu.Unlock() | ||
| 88 | |||
| 89 | if sub, ok := m.subs[id]; ok { | ||
| 90 | sub.Close() | ||
| 91 | delete(m.subs, id) | ||
| 92 | } | ||
| 93 | } | ||
| 94 | |||
| 95 | // CloseAll cancels every subscription held by this manager. | ||
| 96 | func (m *Manager) CloseAll() { | ||
| 97 | m.mu.Lock() | ||
| 98 | defer m.mu.Unlock() | ||
| 99 | |||
| 100 | for id, sub := range m.subs { | ||
| 101 | sub.Close() | ||
| 102 | delete(m.subs, id) | ||
| 103 | } | ||
| 104 | } | ||
| 105 | |||
| 106 | // Fanout delivers envelopeBytes to all subscriptions whose filters match event. | ||
| 107 | // The send is non-blocking: if the channel is full the event is dropped for | ||
| 108 | // that subscriber. | ||
| 109 | func (m *Manager) Fanout(event *axon.Event, envelopeBytes []byte) { | ||
| 110 | m.mu.RLock() | ||
| 111 | defer m.mu.RUnlock() | ||
| 112 | |||
| 113 | for _, sub := range m.subs { | ||
| 114 | if sub.IsClosed() { | ||
| 115 | continue | ||
| 116 | } | ||
| 117 | if MatchesAnyFilter(event, sub.Filters) { | ||
| 118 | select { | ||
| 119 | case sub.Events <- envelopeBytes: | ||
| 120 | case <-sub.done: | ||
| 121 | default: | ||
| 122 | // channel full — drop | ||
| 123 | } | ||
| 124 | } | ||
| 125 | } | ||
| 126 | } | ||
| 127 | |||
| 128 | // MatchesAnyFilter returns true if event matches at least one filter in filters. | ||
| 129 | func MatchesAnyFilter(event *axon.Event, filters []axon.Filter) bool { | ||
| 130 | for i := range filters { | ||
| 131 | if MatchesFilter(event, &filters[i]) { | ||
| 132 | return true | ||
| 133 | } | ||
| 134 | } | ||
| 135 | return false | ||
| 136 | } | ||
| 137 | |||
| 138 | // MatchesFilter returns true if event satisfies all constraints in f. | ||
| 139 | func MatchesFilter(event *axon.Event, f *axon.Filter) bool { | ||
| 140 | if len(f.IDs) > 0 { | ||
| 141 | if !matchesBytesPrefixes(event.ID, f.IDs) { | ||
| 142 | return false | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | if len(f.Authors) > 0 { | ||
| 147 | if !matchesBytesPrefixes(event.PubKey, f.Authors) { | ||
| 148 | return false | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | if len(f.Kinds) > 0 { | ||
| 153 | found := false | ||
| 154 | for _, k := range f.Kinds { | ||
| 155 | if event.Kind == k { | ||
| 156 | found = true | ||
| 157 | break | ||
| 158 | } | ||
| 159 | } | ||
| 160 | if !found { | ||
| 161 | return false | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | if f.Since != 0 && event.CreatedAt < f.Since { | ||
| 166 | return false | ||
| 167 | } | ||
| 168 | |||
| 169 | if f.Until != 0 && event.CreatedAt > f.Until { | ||
| 170 | return false | ||
| 171 | } | ||
| 172 | |||
| 173 | for _, tf := range f.Tags { | ||
| 174 | if !eventHasTagMatch(event, tf.Name, tf.Values) { | ||
| 175 | return false | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | return true | ||
| 180 | } | ||
| 181 | |||
| 182 | // matchesBytesPrefixes returns true if value has any of the given byte slices | ||
| 183 | // as a prefix. A prefix of exactly 32 bytes must match exactly. | ||
| 184 | func matchesBytesPrefixes(value []byte, prefixes [][]byte) bool { | ||
| 185 | for _, prefix := range prefixes { | ||
| 186 | if len(prefix) == 0 { | ||
| 187 | return true | ||
| 188 | } | ||
| 189 | if len(prefix) >= len(value) { | ||
| 190 | if bytes.Equal(value, prefix) { | ||
| 191 | return true | ||
| 192 | } | ||
| 193 | } else { | ||
| 194 | if bytes.HasPrefix(value, prefix) { | ||
| 195 | return true | ||
| 196 | } | ||
| 197 | } | ||
| 198 | } | ||
| 199 | return false | ||
| 200 | } | ||
| 201 | |||
| 202 | // eventHasTagMatch returns true if event has a tag named name whose first | ||
| 203 | // value matches any of values. | ||
| 204 | func eventHasTagMatch(event *axon.Event, name string, values []string) bool { | ||
| 205 | for _, tag := range event.Tags { | ||
| 206 | if tag.Name != name { | ||
| 207 | continue | ||
| 208 | } | ||
| 209 | if len(values) == 0 { | ||
| 210 | return true | ||
| 211 | } | ||
| 212 | if len(tag.Values) == 0 { | ||
| 213 | continue | ||
| 214 | } | ||
| 215 | for _, v := range values { | ||
| 216 | if tag.Values[0] == v { | ||
| 217 | return true | ||
| 218 | } | ||
| 219 | } | ||
| 220 | } | ||
| 221 | return false | ||
| 222 | } | ||
| 223 | |||
| 224 | // GlobalManager is a relay-wide manager that holds subscriptions from all | ||
| 225 | // connections and supports cross-connection fanout. | ||
| 226 | type GlobalManager struct { | ||
| 227 | mu sync.RWMutex | ||
| 228 | subs map[string]*Subscription // key: "connID:subID" | ||
| 229 | } | ||
| 230 | |||
| 231 | // NewGlobalManager returns an empty GlobalManager. | ||
| 232 | func NewGlobalManager() *GlobalManager { | ||
| 233 | return &GlobalManager{subs: make(map[string]*Subscription)} | ||
| 234 | } | ||
| 235 | |||
| 236 | // Register adds a subscription under a globally unique key. | ||
| 237 | func (g *GlobalManager) Register(connID, subID string, sub *Subscription) { | ||
| 238 | key := connID + ":" + subID | ||
| 239 | g.mu.Lock() | ||
| 240 | defer g.mu.Unlock() | ||
| 241 | if old, ok := g.subs[key]; ok { | ||
| 242 | old.Close() | ||
| 243 | } | ||
| 244 | g.subs[key] = sub | ||
| 245 | } | ||
| 246 | |||
| 247 | // Unregister removes a subscription. | ||
| 248 | func (g *GlobalManager) Unregister(connID, subID string) { | ||
| 249 | key := connID + ":" + subID | ||
| 250 | g.mu.Lock() | ||
| 251 | defer g.mu.Unlock() | ||
| 252 | if sub, ok := g.subs[key]; ok { | ||
| 253 | sub.Close() | ||
| 254 | delete(g.subs, key) | ||
| 255 | } | ||
| 256 | } | ||
| 257 | |||
| 258 | // UnregisterConn removes all subscriptions for a connection. | ||
| 259 | func (g *GlobalManager) UnregisterConn(connID string) { | ||
| 260 | prefix := connID + ":" | ||
| 261 | g.mu.Lock() | ||
| 262 | defer g.mu.Unlock() | ||
| 263 | for key, sub := range g.subs { | ||
| 264 | if len(key) > len(prefix) && key[:len(prefix)] == prefix { | ||
| 265 | sub.Close() | ||
| 266 | delete(g.subs, key) | ||
| 267 | } | ||
| 268 | } | ||
| 269 | } | ||
| 270 | |||
| 271 | // Fanout delivers the event to all matching subscriptions across all connections. | ||
| 272 | func (g *GlobalManager) Fanout(event *axon.Event, envelopeBytes []byte) { | ||
| 273 | g.mu.RLock() | ||
| 274 | defer g.mu.RUnlock() | ||
| 275 | |||
| 276 | for _, sub := range g.subs { | ||
| 277 | if sub.IsClosed() { | ||
| 278 | continue | ||
| 279 | } | ||
| 280 | if MatchesAnyFilter(event, sub.Filters) { | ||
| 281 | select { | ||
| 282 | case sub.Events <- envelopeBytes: | ||
| 283 | case <-sub.done: | ||
| 284 | default: | ||
| 285 | } | ||
| 286 | } | ||
| 287 | } | ||
| 288 | } | ||
| 289 | |||
| 290 | // PurgeExpired removes closed subscriptions from the global map. | ||
| 291 | // Call periodically to prevent unbounded growth. | ||
| 292 | func (g *GlobalManager) PurgeExpired() { | ||
| 293 | g.mu.Lock() | ||
| 294 | defer g.mu.Unlock() | ||
| 295 | for key, sub := range g.subs { | ||
| 296 | if sub.IsClosed() { | ||
| 297 | delete(g.subs, key) | ||
| 298 | } | ||
| 299 | } | ||
| 300 | } | ||
| 301 | |||
| 302 | // StartPurger launches a background goroutine that periodically removes closed | ||
| 303 | // subscriptions. | ||
| 304 | func (g *GlobalManager) StartPurger(interval time.Duration, stop <-chan struct{}) { | ||
| 305 | go func() { | ||
| 306 | ticker := time.NewTicker(interval) | ||
| 307 | defer ticker.Stop() | ||
| 308 | for { | ||
| 309 | select { | ||
| 310 | case <-ticker.C: | ||
| 311 | g.PurgeExpired() | ||
| 312 | case <-stop: | ||
| 313 | return | ||
| 314 | } | ||
| 315 | } | ||
| 316 | }() | ||
| 317 | } | ||
