summaryrefslogtreecommitdiffstats
path: root/internal/handler/connect
diff options
context:
space:
mode:
Diffstat (limited to 'internal/handler/connect')
-rw-r--r--internal/handler/connect/handler.go101
1 files changed, 101 insertions, 0 deletions
diff --git a/internal/handler/connect/handler.go b/internal/handler/connect/handler.go
new file mode 100644
index 0000000..f33e4fc
--- /dev/null
+++ b/internal/handler/connect/handler.go
@@ -0,0 +1,101 @@
1package connect
2
3import (
4 "context"
5
6 "connectrpc.com/connect"
7 "google.golang.org/grpc/metadata"
8
9 pb "northwest.io/nostr-grpc/api/nostr/v1"
10 "northwest.io/nostr-grpc/api/nostr/v1/nostrv1connect"
11 grpchandler "northwest.io/nostr-grpc/internal/handler/grpc"
12)
13
14type Handler struct {
15 grpcServer *grpchandler.Server
16}
17
18func NewHandler(grpcServer *grpchandler.Server) *Handler {
19 return &Handler{grpcServer: grpcServer}
20}
21
22func (h *Handler) PublishEvent(ctx context.Context, req *connect.Request[pb.PublishEventRequest]) (*connect.Response[pb.PublishEventResponse], error) {
23 resp, err := h.grpcServer.PublishEvent(ctx, req.Msg)
24 if err != nil {
25 return nil, err
26 }
27 return connect.NewResponse(resp), nil
28}
29
30func (h *Handler) Subscribe(ctx context.Context, req *connect.Request[pb.SubscribeRequest], stream *connect.ServerStream[pb.Event]) error {
31 return h.grpcServer.Subscribe(req.Msg, &subscribeStreamAdapter{stream: stream, ctx: ctx})
32}
33
34func (h *Handler) Unsubscribe(ctx context.Context, req *connect.Request[pb.UnsubscribeRequest]) (*connect.Response[pb.Empty], error) {
35 resp, err := h.grpcServer.Unsubscribe(ctx, req.Msg)
36 if err != nil {
37 return nil, err
38 }
39 return connect.NewResponse(resp), nil
40}
41
42func (h *Handler) PublishBatch(ctx context.Context, req *connect.Request[pb.PublishBatchRequest]) (*connect.Response[pb.PublishBatchResponse], error) {
43 resp, err := h.grpcServer.PublishBatch(ctx, req.Msg)
44 if err != nil {
45 return nil, err
46 }
47 return connect.NewResponse(resp), nil
48}
49
50func (h *Handler) QueryEvents(ctx context.Context, req *connect.Request[pb.QueryRequest]) (*connect.Response[pb.QueryResponse], error) {
51 resp, err := h.grpcServer.QueryEvents(ctx, req.Msg)
52 if err != nil {
53 return nil, err
54 }
55 return connect.NewResponse(resp), nil
56}
57
58func (h *Handler) CountEvents(ctx context.Context, req *connect.Request[pb.CountRequest]) (*connect.Response[pb.CountResponse], error) {
59 resp, err := h.grpcServer.CountEvents(ctx, req.Msg)
60 if err != nil {
61 return nil, err
62 }
63 return connect.NewResponse(resp), nil
64}
65
66type subscribeStreamAdapter struct {
67 stream *connect.ServerStream[pb.Event]
68 ctx context.Context
69}
70
71func (s *subscribeStreamAdapter) Send(event *pb.Event) error {
72 return s.stream.Send(event)
73}
74
75func (s *subscribeStreamAdapter) Context() context.Context {
76 return s.ctx
77}
78
79func (s *subscribeStreamAdapter) SetHeader(md metadata.MD) error {
80 return nil
81}
82
83func (s *subscribeStreamAdapter) SendHeader(md metadata.MD) error {
84 return nil
85}
86
87func (s *subscribeStreamAdapter) SetTrailer(md metadata.MD) {
88}
89
90func (s *subscribeStreamAdapter) SendMsg(m any) error {
91 if event, ok := m.(*pb.Event); ok {
92 return s.Send(event)
93 }
94 return nil
95}
96
97func (s *subscribeStreamAdapter) RecvMsg(m any) error {
98 return nil
99}
100
101var _ nostrv1connect.NostrRelayHandler = (*Handler)(nil)