summaryrefslogtreecommitdiffstats
path: root/internal/handler/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'internal/handler/grpc')
-rw-r--r--internal/handler/grpc/server.go74
1 files changed, 71 insertions, 3 deletions
diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go
index a3a3175..74857f6 100644
--- a/internal/handler/grpc/server.go
+++ b/internal/handler/grpc/server.go
@@ -2,10 +2,12 @@ package grpc
2 2
3import ( 3import (
4 "context" 4 "context"
5 "crypto/rand"
5 "fmt" 6 "fmt"
6 7
7 pb "northwest.io/nostr-grpc/api/nostr/v1" 8 pb "northwest.io/nostr-grpc/api/nostr/v1"
8 "northwest.io/nostr-grpc/internal/storage" 9 "northwest.io/nostr-grpc/internal/storage"
10 "northwest.io/nostr-grpc/internal/subscription"
9) 11)
10 12
11type EventStore interface { 13type EventStore interface {
@@ -16,10 +18,14 @@ type EventStore interface {
16type Server struct { 18type Server struct {
17 pb.UnimplementedNostrRelayServer 19 pb.UnimplementedNostrRelayServer
18 store EventStore 20 store EventStore
21 subs *subscription.Manager
19} 22}
20 23
21func NewServer(store EventStore) *Server { 24func NewServer(store EventStore) *Server {
22 return &Server{store: store} 25 return &Server{
26 store: store,
27 subs: subscription.NewManager(),
28 }
23} 29}
24 30
25func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { 31func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) {
@@ -65,6 +71,8 @@ func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest)
65 return nil, fmt.Errorf("failed to store event: %w", err) 71 return nil, fmt.Errorf("failed to store event: %w", err)
66 } 72 }
67 73
74 s.subs.MatchAndFan(req.Event)
75
68 return &pb.PublishEventResponse{ 76 return &pb.PublishEventResponse{
69 Accepted: true, 77 Accepted: true,
70 Message: "success", 78 Message: "success",
@@ -120,9 +128,69 @@ func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest)
120} 128}
121 129
122func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { 130func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error {
123 return fmt.Errorf("not implemented yet") 131 subID := req.SubscriptionId
132 if subID == "" {
133 subID = generateSubID()
134 }
135
136 sub := &subscription.Subscription{
137 ID: subID,
138 Filters: req.Filters,
139 Events: make(chan *pb.Event, 100),
140 }
141 sub.InitDone()
142
143 s.subs.Add(sub)
144 defer s.subs.Remove(subID)
145
146 opts := &storage.QueryOptions{
147 IncludeCanonical: req.IncludeCanonicalJson,
148 Limit: 0,
149 }
150
151 storedEvents, err := s.store.QueryEvents(stream.Context(), req.Filters, opts)
152 if err != nil {
153 return fmt.Errorf("query failed: %w", err)
154 }
155
156 for _, event := range storedEvents {
157 if err := stream.Send(event); err != nil {
158 return err
159 }
160 }
161
162 for {
163 select {
164 case event, ok := <-sub.Events:
165 if !ok {
166 return nil
167 }
168
169 eventToSend := event
170 if req.IncludeCanonicalJson && event.CanonicalJson == nil {
171 return fmt.Errorf("canonical JSON requested but not available")
172 }
173
174 if err := stream.Send(eventToSend); err != nil {
175 return err
176 }
177
178 case <-stream.Context().Done():
179 return stream.Context().Err()
180
181 case <-sub.Done():
182 return nil
183 }
184 }
124} 185}
125 186
126func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { 187func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) {
127 return nil, fmt.Errorf("not implemented yet") 188 s.subs.Remove(req.SubscriptionId)
189 return &pb.Empty{}, nil
190}
191
192func generateSubID() string {
193 b := make([]byte, 8)
194 rand.Read(b)
195 return fmt.Sprintf("%x", b)
128} 196}