From 83876eae868bd1e4fb6b9a823a6e8173919f290d Mon Sep 17 00:00:00 2001 From: bndw Date: Fri, 13 Feb 2026 18:26:53 -0800 Subject: feat: add Connect (gRPC over HTTP/JSON) support Connect integration: - Buf Connect codegen added to buf.gen.yaml - Connect handler wraps gRPC server - Serves on same port as WebSocket (:8080) - HTTP/2 with h2c for cleartext HTTP/2 Now serving THREE protocols: 1. gRPC (native) on :50051 - binary, high performance 2. Connect on :8080/nostr.v1.NostrRelay/* - HTTP/JSON, browser compatible 3. WebSocket on :8080/ - Nostr standard protocol All three protocols share: - Same storage layer - Same subscription manager - Same validation logic Browser-friendly! Call gRPC methods with fetch() or curl. --- internal/handler/connect/handler.go | 101 ++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 internal/handler/connect/handler.go (limited to 'internal/handler') diff --git a/internal/handler/connect/handler.go b/internal/handler/connect/handler.go new file mode 100644 index 0000000..f33e4fc --- /dev/null +++ b/internal/handler/connect/handler.go @@ -0,0 +1,101 @@ +package connect + +import ( + "context" + + "connectrpc.com/connect" + "google.golang.org/grpc/metadata" + + pb "northwest.io/nostr-grpc/api/nostr/v1" + "northwest.io/nostr-grpc/api/nostr/v1/nostrv1connect" + grpchandler "northwest.io/nostr-grpc/internal/handler/grpc" +) + +type Handler struct { + grpcServer *grpchandler.Server +} + +func NewHandler(grpcServer *grpchandler.Server) *Handler { + return &Handler{grpcServer: grpcServer} +} + +func (h *Handler) PublishEvent(ctx context.Context, req *connect.Request[pb.PublishEventRequest]) (*connect.Response[pb.PublishEventResponse], error) { + resp, err := h.grpcServer.PublishEvent(ctx, req.Msg) + if err != nil { + return nil, err + } + return connect.NewResponse(resp), nil +} + +func (h *Handler) Subscribe(ctx context.Context, req *connect.Request[pb.SubscribeRequest], stream *connect.ServerStream[pb.Event]) error { + return h.grpcServer.Subscribe(req.Msg, &subscribeStreamAdapter{stream: stream, ctx: ctx}) +} + +func (h *Handler) Unsubscribe(ctx context.Context, req *connect.Request[pb.UnsubscribeRequest]) (*connect.Response[pb.Empty], error) { + resp, err := h.grpcServer.Unsubscribe(ctx, req.Msg) + if err != nil { + return nil, err + } + return connect.NewResponse(resp), nil +} + +func (h *Handler) PublishBatch(ctx context.Context, req *connect.Request[pb.PublishBatchRequest]) (*connect.Response[pb.PublishBatchResponse], error) { + resp, err := h.grpcServer.PublishBatch(ctx, req.Msg) + if err != nil { + return nil, err + } + return connect.NewResponse(resp), nil +} + +func (h *Handler) QueryEvents(ctx context.Context, req *connect.Request[pb.QueryRequest]) (*connect.Response[pb.QueryResponse], error) { + resp, err := h.grpcServer.QueryEvents(ctx, req.Msg) + if err != nil { + return nil, err + } + return connect.NewResponse(resp), nil +} + +func (h *Handler) CountEvents(ctx context.Context, req *connect.Request[pb.CountRequest]) (*connect.Response[pb.CountResponse], error) { + resp, err := h.grpcServer.CountEvents(ctx, req.Msg) + if err != nil { + return nil, err + } + return connect.NewResponse(resp), nil +} + +type subscribeStreamAdapter struct { + stream *connect.ServerStream[pb.Event] + ctx context.Context +} + +func (s *subscribeStreamAdapter) Send(event *pb.Event) error { + return s.stream.Send(event) +} + +func (s *subscribeStreamAdapter) Context() context.Context { + return s.ctx +} + +func (s *subscribeStreamAdapter) SetHeader(md metadata.MD) error { + return nil +} + +func (s *subscribeStreamAdapter) SendHeader(md metadata.MD) error { + return nil +} + +func (s *subscribeStreamAdapter) SetTrailer(md metadata.MD) { +} + +func (s *subscribeStreamAdapter) SendMsg(m any) error { + if event, ok := m.(*pb.Event); ok { + return s.Send(event) + } + return nil +} + +func (s *subscribeStreamAdapter) RecvMsg(m any) error { + return nil +} + +var _ nostrv1connect.NostrRelayHandler = (*Handler)(nil) -- cgit v1.2.3