From 62d31434ddbadff18580826576e1169f539e23f0 Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 17:48:36 -0800 Subject: feat: add gRPC handler with event validation and publishing Handler implementation: - EventStore interface (consumer-side) - Server with PublishEvent, QueryEvents, CountEvents, PublishBatch - pb.Event <-> nostr.Event conversion helpers - Signature and ID validation using existing nostr package - Canonical JSON generation for storage 9 tests passing --- internal/handler/grpc/server.go | 128 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) create mode 100644 internal/handler/grpc/server.go (limited to 'internal/handler/grpc/server.go') diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go new file mode 100644 index 0000000..a3a3175 --- /dev/null +++ b/internal/handler/grpc/server.go @@ -0,0 +1,128 @@ +package grpc + +import ( + "context" + "fmt" + + pb "northwest.io/nostr-grpc/api/nostr/v1" + "northwest.io/nostr-grpc/internal/storage" +) + +type EventStore interface { + StoreEvent(context.Context, *storage.EventData) error + QueryEvents(context.Context, []*pb.Filter, *storage.QueryOptions) ([]*pb.Event, error) +} + +type Server struct { + pb.UnimplementedNostrRelayServer + store EventStore +} + +func NewServer(store EventStore) *Server { + return &Server{store: store} +} + +func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { + if req.Event == nil { + return &pb.PublishEventResponse{ + Accepted: false, + Message: "event is required", + }, nil + } + + nostrEvent := PBToNostr(req.Event) + + if !nostrEvent.CheckID() { + return &pb.PublishEventResponse{ + Accepted: false, + Message: "invalid event ID", + }, nil + } + + if !nostrEvent.Verify() { + return &pb.PublishEventResponse{ + Accepted: false, + Message: "invalid signature", + }, nil + } + + canonicalJSON := nostrEvent.Serialize() + + eventData := &storage.EventData{ + Event: req.Event, + CanonicalJSON: canonicalJSON, + } + + err := s.store.StoreEvent(ctx, eventData) + if err == storage.ErrEventExists { + return &pb.PublishEventResponse{ + Accepted: false, + Message: "duplicate: event already exists", + CanonicalJson: canonicalJSON, + }, nil + } + if err != nil { + return nil, fmt.Errorf("failed to store event: %w", err) + } + + return &pb.PublishEventResponse{ + Accepted: true, + Message: "success", + CanonicalJson: canonicalJSON, + }, nil +} + +func (s *Server) QueryEvents(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) { + opts := &storage.QueryOptions{ + IncludeCanonical: req.IncludeCanonicalJson, + Limit: req.PageSize, + } + + if opts.Limit == 0 { + opts.Limit = 100 + } + + events, err := s.store.QueryEvents(ctx, req.Filters, opts) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + + return &pb.QueryResponse{ + Events: events, + }, nil +} + +func (s *Server) CountEvents(ctx context.Context, req *pb.CountRequest) (*pb.CountResponse, error) { + events, err := s.store.QueryEvents(ctx, req.Filters, &storage.QueryOptions{Limit: 0}) + if err != nil { + return nil, fmt.Errorf("count failed: %w", err) + } + + return &pb.CountResponse{ + Count: int64(len(events)), + }, nil +} + +func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest) (*pb.PublishBatchResponse, error) { + results := make([]*pb.PublishEventResponse, len(req.Events)) + + for i, event := range req.Events { + resp, err := s.PublishEvent(ctx, &pb.PublishEventRequest{Event: event}) + if err != nil { + return nil, err + } + results[i] = resp + } + + return &pb.PublishBatchResponse{ + Results: results, + }, nil +} + +func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { + return fmt.Errorf("not implemented yet") +} + +func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { + return nil, fmt.Errorf("not implemented yet") +} -- cgit v1.2.3