package grpc import ( "context" "crypto/rand" "fmt" pb "northwest.io/nostr-grpc/api/nostr/v1" "northwest.io/nostr-grpc/internal/storage" "northwest.io/nostr-grpc/internal/subscription" ) type EventStore interface { StoreEvent(context.Context, *storage.EventData) error QueryEvents(context.Context, []*pb.Filter, *storage.QueryOptions) ([]*pb.Event, error) ProcessDeletion(context.Context, *pb.Event) error } type Server struct { pb.UnimplementedNostrRelayServer store EventStore subs *subscription.Manager } func NewServer(store EventStore) *Server { return &Server{ store: store, subs: subscription.NewManager(), } } func (s *Server) SetSubscriptionManager(mgr *subscription.Manager) { s.subs = mgr } func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { if req.Event == nil { return &pb.PublishEventResponse{ Accepted: false, Message: "event is required", }, nil } nostrEvent := PBToNostr(req.Event) if !nostrEvent.CheckID() { return &pb.PublishEventResponse{ Accepted: false, Message: "invalid event ID", }, nil } if !nostrEvent.Verify() { return &pb.PublishEventResponse{ Accepted: false, Message: "invalid signature", }, nil } canonicalJSON := nostrEvent.Serialize() // Handle deletion events (kind 5) - process but don't store if req.Event.Kind == 5 { if err := s.store.ProcessDeletion(ctx, req.Event); err != nil { return &pb.PublishEventResponse{ Accepted: false, Message: fmt.Sprintf("deletion failed: %v", err), CanonicalJson: canonicalJSON, }, nil } return &pb.PublishEventResponse{ Accepted: true, Message: "deleted", CanonicalJson: canonicalJSON, }, nil } eventData := &storage.EventData{ Event: req.Event, CanonicalJSON: canonicalJSON, } err := s.store.StoreEvent(ctx, eventData) if err == storage.ErrEventExists { return &pb.PublishEventResponse{ Accepted: false, Message: "duplicate: event already exists", CanonicalJson: canonicalJSON, }, nil } if err != nil { return nil, fmt.Errorf("failed to store event: %w", err) } s.subs.MatchAndFan(req.Event) return &pb.PublishEventResponse{ Accepted: true, Message: "success", CanonicalJson: canonicalJSON, }, nil } func (s *Server) QueryEvents(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) { opts := &storage.QueryOptions{ IncludeCanonical: req.IncludeCanonicalJson, Limit: req.PageSize, } if opts.Limit == 0 { opts.Limit = 100 } events, err := s.store.QueryEvents(ctx, req.Filters, opts) if err != nil { return nil, fmt.Errorf("query failed: %w", err) } return &pb.QueryResponse{ Events: events, }, nil } func (s *Server) CountEvents(ctx context.Context, req *pb.CountRequest) (*pb.CountResponse, error) { events, err := s.store.QueryEvents(ctx, req.Filters, &storage.QueryOptions{Limit: 0}) if err != nil { return nil, fmt.Errorf("count failed: %w", err) } return &pb.CountResponse{ Count: int64(len(events)), }, nil } func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest) (*pb.PublishBatchResponse, error) { results := make([]*pb.PublishEventResponse, len(req.Events)) for i, event := range req.Events { resp, err := s.PublishEvent(ctx, &pb.PublishEventRequest{Event: event}) if err != nil { return nil, err } results[i] = resp } return &pb.PublishBatchResponse{ Results: results, }, nil } func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { subID := req.SubscriptionId if subID == "" { subID = generateSubID() } sub := &subscription.Subscription{ ID: subID, Filters: req.Filters, Events: make(chan *pb.Event, 100), } sub.InitDone() s.subs.Add(sub) defer s.subs.Remove(subID) opts := &storage.QueryOptions{ IncludeCanonical: req.IncludeCanonicalJson, Limit: 0, } storedEvents, err := s.store.QueryEvents(stream.Context(), req.Filters, opts) if err != nil { return fmt.Errorf("query failed: %w", err) } for _, event := range storedEvents { if err := stream.Send(event); err != nil { return err } } for { select { case event, ok := <-sub.Events: if !ok { return nil } eventToSend := event if req.IncludeCanonicalJson && event.CanonicalJson == nil { return fmt.Errorf("canonical JSON requested but not available") } if err := stream.Send(eventToSend); err != nil { return err } case <-stream.Context().Done(): return stream.Context().Err() case <-sub.Done(): return nil } } } func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { s.subs.Remove(req.SubscriptionId) return &pb.Empty{}, nil } func generateSubID() string { b := make([]byte, 8) rand.Read(b) return fmt.Sprintf("%x", b) }