From 61a85baf87d89fcc09f9469a113a2ddc982b0a24 Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 08:01:02 -0700 Subject: feat: phase 2 relay implementation Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay) --- relay/subscription/manager.go | 317 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100644 relay/subscription/manager.go (limited to 'relay/subscription') diff --git a/relay/subscription/manager.go b/relay/subscription/manager.go new file mode 100644 index 0000000..d8ba653 --- /dev/null +++ b/relay/subscription/manager.go @@ -0,0 +1,317 @@ +// Package subscription manages in-memory subscriptions and live event fanout. +package subscription + +import ( + "bytes" + "sync" + "time" + + "axon" +) + +// Subscription holds a single client subscription: an ID, a set of filters, +// and a buffered channel for live event delivery. +type Subscription struct { + ID string + Filters []axon.Filter + // Events carries raw msgpack envelope bytes for matched events. + Events chan []byte + done chan struct{} + once sync.Once +} + +// newSubscription creates a subscription with a buffered event channel. +func newSubscription(id string, filters []axon.Filter) *Subscription { + return &Subscription{ + ID: id, + Filters: filters, + Events: make(chan []byte, 100), + done: make(chan struct{}), + } +} + +// Close shuts down the subscription and drains the event channel. +func (s *Subscription) Close() { + s.once.Do(func() { + close(s.done) + close(s.Events) + }) +} + +// IsClosed reports whether the subscription has been closed. +func (s *Subscription) IsClosed() bool { + select { + case <-s.done: + return true + default: + return false + } +} + +// Done returns a channel that is closed when the subscription is closed. +func (s *Subscription) Done() <-chan struct{} { + return s.done +} + +// Manager maintains all active subscriptions for a single connection and fans +// out incoming events to matching subscribers. +type Manager struct { + mu sync.RWMutex + subs map[string]*Subscription +} + +// NewManager returns an empty Manager. +func NewManager() *Manager { + return &Manager{subs: make(map[string]*Subscription)} +} + +// Add registers a new subscription, replacing any existing subscription with +// the same ID. +func (m *Manager) Add(id string, filters []axon.Filter) *Subscription { + m.mu.Lock() + defer m.mu.Unlock() + + // Remove old subscription with same ID if present. + if old, ok := m.subs[id]; ok { + old.Close() + } + + sub := newSubscription(id, filters) + m.subs[id] = sub + return sub +} + +// Remove cancels and deletes a subscription by ID. +func (m *Manager) Remove(id string) { + m.mu.Lock() + defer m.mu.Unlock() + + if sub, ok := m.subs[id]; ok { + sub.Close() + delete(m.subs, id) + } +} + +// CloseAll cancels every subscription held by this manager. +func (m *Manager) CloseAll() { + m.mu.Lock() + defer m.mu.Unlock() + + for id, sub := range m.subs { + sub.Close() + delete(m.subs, id) + } +} + +// Fanout delivers envelopeBytes to all subscriptions whose filters match event. +// The send is non-blocking: if the channel is full the event is dropped for +// that subscriber. +func (m *Manager) Fanout(event *axon.Event, envelopeBytes []byte) { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, sub := range m.subs { + if sub.IsClosed() { + continue + } + if MatchesAnyFilter(event, sub.Filters) { + select { + case sub.Events <- envelopeBytes: + case <-sub.done: + default: + // channel full — drop + } + } + } +} + +// MatchesAnyFilter returns true if event matches at least one filter in filters. +func MatchesAnyFilter(event *axon.Event, filters []axon.Filter) bool { + for i := range filters { + if MatchesFilter(event, &filters[i]) { + return true + } + } + return false +} + +// MatchesFilter returns true if event satisfies all constraints in f. +func MatchesFilter(event *axon.Event, f *axon.Filter) bool { + if len(f.IDs) > 0 { + if !matchesBytesPrefixes(event.ID, f.IDs) { + return false + } + } + + if len(f.Authors) > 0 { + if !matchesBytesPrefixes(event.PubKey, f.Authors) { + return false + } + } + + if len(f.Kinds) > 0 { + found := false + for _, k := range f.Kinds { + if event.Kind == k { + found = true + break + } + } + if !found { + return false + } + } + + if f.Since != 0 && event.CreatedAt < f.Since { + return false + } + + if f.Until != 0 && event.CreatedAt > f.Until { + return false + } + + for _, tf := range f.Tags { + if !eventHasTagMatch(event, tf.Name, tf.Values) { + return false + } + } + + return true +} + +// matchesBytesPrefixes returns true if value has any of the given byte slices +// as a prefix. A prefix of exactly 32 bytes must match exactly. +func matchesBytesPrefixes(value []byte, prefixes [][]byte) bool { + for _, prefix := range prefixes { + if len(prefix) == 0 { + return true + } + if len(prefix) >= len(value) { + if bytes.Equal(value, prefix) { + return true + } + } else { + if bytes.HasPrefix(value, prefix) { + return true + } + } + } + return false +} + +// eventHasTagMatch returns true if event has a tag named name whose first +// value matches any of values. +func eventHasTagMatch(event *axon.Event, name string, values []string) bool { + for _, tag := range event.Tags { + if tag.Name != name { + continue + } + if len(values) == 0 { + return true + } + if len(tag.Values) == 0 { + continue + } + for _, v := range values { + if tag.Values[0] == v { + return true + } + } + } + return false +} + +// GlobalManager is a relay-wide manager that holds subscriptions from all +// connections and supports cross-connection fanout. +type GlobalManager struct { + mu sync.RWMutex + subs map[string]*Subscription // key: "connID:subID" +} + +// NewGlobalManager returns an empty GlobalManager. +func NewGlobalManager() *GlobalManager { + return &GlobalManager{subs: make(map[string]*Subscription)} +} + +// Register adds a subscription under a globally unique key. +func (g *GlobalManager) Register(connID, subID string, sub *Subscription) { + key := connID + ":" + subID + g.mu.Lock() + defer g.mu.Unlock() + if old, ok := g.subs[key]; ok { + old.Close() + } + g.subs[key] = sub +} + +// Unregister removes a subscription. +func (g *GlobalManager) Unregister(connID, subID string) { + key := connID + ":" + subID + g.mu.Lock() + defer g.mu.Unlock() + if sub, ok := g.subs[key]; ok { + sub.Close() + delete(g.subs, key) + } +} + +// UnregisterConn removes all subscriptions for a connection. +func (g *GlobalManager) UnregisterConn(connID string) { + prefix := connID + ":" + g.mu.Lock() + defer g.mu.Unlock() + for key, sub := range g.subs { + if len(key) > len(prefix) && key[:len(prefix)] == prefix { + sub.Close() + delete(g.subs, key) + } + } +} + +// Fanout delivers the event to all matching subscriptions across all connections. +func (g *GlobalManager) Fanout(event *axon.Event, envelopeBytes []byte) { + g.mu.RLock() + defer g.mu.RUnlock() + + for _, sub := range g.subs { + if sub.IsClosed() { + continue + } + if MatchesAnyFilter(event, sub.Filters) { + select { + case sub.Events <- envelopeBytes: + case <-sub.done: + default: + } + } + } +} + +// PurgeExpired removes closed subscriptions from the global map. +// Call periodically to prevent unbounded growth. +func (g *GlobalManager) PurgeExpired() { + g.mu.Lock() + defer g.mu.Unlock() + for key, sub := range g.subs { + if sub.IsClosed() { + delete(g.subs, key) + } + } +} + +// StartPurger launches a background goroutine that periodically removes closed +// subscriptions. +func (g *GlobalManager) StartPurger(interval time.Duration, stop <-chan struct{}) { + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + g.PurgeExpired() + case <-stop: + return + } + } + }() +} -- cgit v1.2.3