package subscription import ( "sync" pb "northwest.io/nostr-grpc/api/nostr/v1" ) type Subscription struct { ID string Filters []*pb.Filter Events chan *pb.Event done chan struct{} once sync.Once } func (s *Subscription) InitDone() { s.done = make(chan struct{}) } func (s *Subscription) Done() <-chan struct{} { return s.done } func (s *Subscription) Close() { s.once.Do(func() { close(s.done) close(s.Events) }) } func (s *Subscription) IsClosed() bool { select { case <-s.done: return true default: return false } } type Manager struct { mu sync.RWMutex subscriptions map[string]*Subscription } func NewManager() *Manager { return &Manager{ subscriptions: make(map[string]*Subscription), } } func (m *Manager) Add(sub *Subscription) { m.mu.Lock() defer m.mu.Unlock() m.subscriptions[sub.ID] = sub } func (m *Manager) Remove(id string) { m.mu.Lock() defer m.mu.Unlock() if sub, ok := m.subscriptions[id]; ok { sub.Close() delete(m.subscriptions, id) } } func (m *Manager) Get(id string) (*Subscription, bool) { m.mu.RLock() defer m.mu.RUnlock() sub, ok := m.subscriptions[id] return sub, ok } func (m *Manager) MatchAndFan(event *pb.Event) { m.mu.RLock() defer m.mu.RUnlock() for _, sub := range m.subscriptions { if sub.IsClosed() { continue } if matchesAnyFilter(event, sub.Filters) { select { case sub.Events <- event: case <-sub.done: default: } } } } func matchesAnyFilter(event *pb.Event, filters []*pb.Filter) bool { for _, filter := range filters { if matchesFilter(event, filter) { return true } } return false } func matchesFilter(event *pb.Event, filter *pb.Filter) bool { if len(filter.Ids) > 0 { if !matchesPrefix(event.Id, filter.Ids) { return false } } if len(filter.Authors) > 0 { if !matchesPrefix(event.Pubkey, filter.Authors) { return false } } if len(filter.Kinds) > 0 { found := false for _, kind := range filter.Kinds { if event.Kind == kind { found = true break } } if !found { return false } } if filter.Since != nil && *filter.Since > 0 { if event.CreatedAt < *filter.Since { return false } } if filter.Until != nil && *filter.Until > 0 { if event.CreatedAt > *filter.Until { return false } } if len(filter.ETags) > 0 { if !hasTag(event, "e", filter.ETags) { return false } } if len(filter.PTags) > 0 { if !hasTag(event, "p", filter.PTags) { return false } } for tagName, tagFilter := range filter.TagFilters { if len(tagFilter.Values) > 0 { if !hasTag(event, tagName, tagFilter.Values) { return false } } } return true } func matchesPrefix(value string, prefixes []string) bool { for _, prefix := range prefixes { if len(prefix) == len(value) { if value == prefix { return true } } else if len(value) > len(prefix) { if value[:len(prefix)] == prefix { return true } } } return false } func hasTag(event *pb.Event, tagName string, values []string) bool { for _, tag := range event.Tags { if len(tag.Values) >= 2 && tag.Values[0] == tagName { for _, val := range values { if tag.Values[1] == val { return true } } } } return false }