// Package subscription manages in-memory subscriptions and live event fanout. package subscription import ( "bytes" "sync" "time" "axon" ) // Subscription holds a single client subscription: an ID, a set of filters, // and a buffered channel for live event delivery. type Subscription struct { ID string Filters []axon.Filter // Events carries raw msgpack envelope bytes for matched events. Events chan []byte done chan struct{} once sync.Once } // newSubscription creates a subscription with a buffered event channel. func newSubscription(id string, filters []axon.Filter) *Subscription { return &Subscription{ ID: id, Filters: filters, Events: make(chan []byte, 100), done: make(chan struct{}), } } // Close shuts down the subscription and drains the event channel. func (s *Subscription) Close() { s.once.Do(func() { close(s.done) close(s.Events) }) } // IsClosed reports whether the subscription has been closed. func (s *Subscription) IsClosed() bool { select { case <-s.done: return true default: return false } } // Done returns a channel that is closed when the subscription is closed. func (s *Subscription) Done() <-chan struct{} { return s.done } // Manager maintains all active subscriptions for a single connection and fans // out incoming events to matching subscribers. type Manager struct { mu sync.RWMutex subs map[string]*Subscription } // NewManager returns an empty Manager. func NewManager() *Manager { return &Manager{subs: make(map[string]*Subscription)} } // Add registers a new subscription, replacing any existing subscription with // the same ID. func (m *Manager) Add(id string, filters []axon.Filter) *Subscription { m.mu.Lock() defer m.mu.Unlock() // Remove old subscription with same ID if present. if old, ok := m.subs[id]; ok { old.Close() } sub := newSubscription(id, filters) m.subs[id] = sub return sub } // Remove cancels and deletes a subscription by ID. func (m *Manager) Remove(id string) { m.mu.Lock() defer m.mu.Unlock() if sub, ok := m.subs[id]; ok { sub.Close() delete(m.subs, id) } } // CloseAll cancels every subscription held by this manager. func (m *Manager) CloseAll() { m.mu.Lock() defer m.mu.Unlock() for id, sub := range m.subs { sub.Close() delete(m.subs, id) } } // Fanout delivers envelopeBytes to all subscriptions whose filters match event. // The send is non-blocking: if the channel is full the event is dropped for // that subscriber. func (m *Manager) Fanout(event *axon.Event, envelopeBytes []byte) { m.mu.RLock() defer m.mu.RUnlock() for _, sub := range m.subs { if sub.IsClosed() { continue } if MatchesAnyFilter(event, sub.Filters) { select { case sub.Events <- envelopeBytes: case <-sub.done: default: // channel full — drop } } } } // MatchesAnyFilter returns true if event matches at least one filter in filters. func MatchesAnyFilter(event *axon.Event, filters []axon.Filter) bool { for i := range filters { if MatchesFilter(event, &filters[i]) { return true } } return false } // MatchesFilter returns true if event satisfies all constraints in f. func MatchesFilter(event *axon.Event, f *axon.Filter) bool { if len(f.IDs) > 0 { if !matchesBytesPrefixes(event.ID, f.IDs) { return false } } if len(f.Authors) > 0 { if !matchesBytesPrefixes(event.PubKey, f.Authors) { return false } } if len(f.Kinds) > 0 { found := false for _, k := range f.Kinds { if event.Kind == k { found = true break } } if !found { return false } } if f.Since != 0 && event.CreatedAt < f.Since { return false } if f.Until != 0 && event.CreatedAt > f.Until { return false } for _, tf := range f.Tags { if !eventHasTagMatch(event, tf.Name, tf.Values) { return false } } return true } // matchesBytesPrefixes returns true if value has any of the given byte slices // as a prefix. A prefix of exactly 32 bytes must match exactly. func matchesBytesPrefixes(value []byte, prefixes [][]byte) bool { for _, prefix := range prefixes { if len(prefix) == 0 { return true } if len(prefix) >= len(value) { if bytes.Equal(value, prefix) { return true } } else { if bytes.HasPrefix(value, prefix) { return true } } } return false } // eventHasTagMatch returns true if event has a tag named name whose first // value matches any of values. func eventHasTagMatch(event *axon.Event, name string, values []string) bool { for _, tag := range event.Tags { if tag.Name != name { continue } if len(values) == 0 { return true } if len(tag.Values) == 0 { continue } for _, v := range values { if tag.Values[0] == v { return true } } } return false } // GlobalManager is a relay-wide manager that holds subscriptions from all // connections and supports cross-connection fanout. type GlobalManager struct { mu sync.RWMutex subs map[string]*Subscription // key: "connID:subID" } // NewGlobalManager returns an empty GlobalManager. func NewGlobalManager() *GlobalManager { return &GlobalManager{subs: make(map[string]*Subscription)} } // Register adds a subscription under a globally unique key. func (g *GlobalManager) Register(connID, subID string, sub *Subscription) { key := connID + ":" + subID g.mu.Lock() defer g.mu.Unlock() if old, ok := g.subs[key]; ok { old.Close() } g.subs[key] = sub } // Unregister removes a subscription. func (g *GlobalManager) Unregister(connID, subID string) { key := connID + ":" + subID g.mu.Lock() defer g.mu.Unlock() if sub, ok := g.subs[key]; ok { sub.Close() delete(g.subs, key) } } // UnregisterConn removes all subscriptions for a connection. func (g *GlobalManager) UnregisterConn(connID string) { prefix := connID + ":" g.mu.Lock() defer g.mu.Unlock() for key, sub := range g.subs { if len(key) > len(prefix) && key[:len(prefix)] == prefix { sub.Close() delete(g.subs, key) } } } // Fanout delivers the event to all matching subscriptions across all connections. func (g *GlobalManager) Fanout(event *axon.Event, envelopeBytes []byte) { g.mu.RLock() defer g.mu.RUnlock() for _, sub := range g.subs { if sub.IsClosed() { continue } if MatchesAnyFilter(event, sub.Filters) { select { case sub.Events <- envelopeBytes: case <-sub.done: default: } } } } // PurgeExpired removes closed subscriptions from the global map. // Call periodically to prevent unbounded growth. func (g *GlobalManager) PurgeExpired() { g.mu.Lock() defer g.mu.Unlock() for key, sub := range g.subs { if sub.IsClosed() { delete(g.subs, key) } } } // StartPurger launches a background goroutine that periodically removes closed // subscriptions. func (g *GlobalManager) StartPurger(interval time.Duration, stop <-chan struct{}) { go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: g.PurgeExpired() case <-stop: return } } }() }