diff options
Diffstat (limited to 'internal/handler/grpc')
| -rw-r--r-- | internal/handler/grpc/server.go | 74 |
1 files changed, 71 insertions, 3 deletions
diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go index a3a3175..74857f6 100644 --- a/internal/handler/grpc/server.go +++ b/internal/handler/grpc/server.go | |||
| @@ -2,10 +2,12 @@ package grpc | |||
| 2 | 2 | ||
| 3 | import ( | 3 | import ( |
| 4 | "context" | 4 | "context" |
| 5 | "crypto/rand" | ||
| 5 | "fmt" | 6 | "fmt" |
| 6 | 7 | ||
| 7 | pb "northwest.io/nostr-grpc/api/nostr/v1" | 8 | pb "northwest.io/nostr-grpc/api/nostr/v1" |
| 8 | "northwest.io/nostr-grpc/internal/storage" | 9 | "northwest.io/nostr-grpc/internal/storage" |
| 10 | "northwest.io/nostr-grpc/internal/subscription" | ||
| 9 | ) | 11 | ) |
| 10 | 12 | ||
| 11 | type EventStore interface { | 13 | type EventStore interface { |
| @@ -16,10 +18,14 @@ type EventStore interface { | |||
| 16 | type Server struct { | 18 | type Server struct { |
| 17 | pb.UnimplementedNostrRelayServer | 19 | pb.UnimplementedNostrRelayServer |
| 18 | store EventStore | 20 | store EventStore |
| 21 | subs *subscription.Manager | ||
| 19 | } | 22 | } |
| 20 | 23 | ||
| 21 | func NewServer(store EventStore) *Server { | 24 | func NewServer(store EventStore) *Server { |
| 22 | return &Server{store: store} | 25 | return &Server{ |
| 26 | store: store, | ||
| 27 | subs: subscription.NewManager(), | ||
| 28 | } | ||
| 23 | } | 29 | } |
| 24 | 30 | ||
| 25 | func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { | 31 | func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { |
| @@ -65,6 +71,8 @@ func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) | |||
| 65 | return nil, fmt.Errorf("failed to store event: %w", err) | 71 | return nil, fmt.Errorf("failed to store event: %w", err) |
| 66 | } | 72 | } |
| 67 | 73 | ||
| 74 | s.subs.MatchAndFan(req.Event) | ||
| 75 | |||
| 68 | return &pb.PublishEventResponse{ | 76 | return &pb.PublishEventResponse{ |
| 69 | Accepted: true, | 77 | Accepted: true, |
| 70 | Message: "success", | 78 | Message: "success", |
| @@ -120,9 +128,69 @@ func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest) | |||
| 120 | } | 128 | } |
| 121 | 129 | ||
| 122 | func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { | 130 | func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error { |
| 123 | return fmt.Errorf("not implemented yet") | 131 | subID := req.SubscriptionId |
| 132 | if subID == "" { | ||
| 133 | subID = generateSubID() | ||
| 134 | } | ||
| 135 | |||
| 136 | sub := &subscription.Subscription{ | ||
| 137 | ID: subID, | ||
| 138 | Filters: req.Filters, | ||
| 139 | Events: make(chan *pb.Event, 100), | ||
| 140 | } | ||
| 141 | sub.InitDone() | ||
| 142 | |||
| 143 | s.subs.Add(sub) | ||
| 144 | defer s.subs.Remove(subID) | ||
| 145 | |||
| 146 | opts := &storage.QueryOptions{ | ||
| 147 | IncludeCanonical: req.IncludeCanonicalJson, | ||
| 148 | Limit: 0, | ||
| 149 | } | ||
| 150 | |||
| 151 | storedEvents, err := s.store.QueryEvents(stream.Context(), req.Filters, opts) | ||
| 152 | if err != nil { | ||
| 153 | return fmt.Errorf("query failed: %w", err) | ||
| 154 | } | ||
| 155 | |||
| 156 | for _, event := range storedEvents { | ||
| 157 | if err := stream.Send(event); err != nil { | ||
| 158 | return err | ||
| 159 | } | ||
| 160 | } | ||
| 161 | |||
| 162 | for { | ||
| 163 | select { | ||
| 164 | case event, ok := <-sub.Events: | ||
| 165 | if !ok { | ||
| 166 | return nil | ||
| 167 | } | ||
| 168 | |||
| 169 | eventToSend := event | ||
| 170 | if req.IncludeCanonicalJson && event.CanonicalJson == nil { | ||
| 171 | return fmt.Errorf("canonical JSON requested but not available") | ||
| 172 | } | ||
| 173 | |||
| 174 | if err := stream.Send(eventToSend); err != nil { | ||
| 175 | return err | ||
| 176 | } | ||
| 177 | |||
| 178 | case <-stream.Context().Done(): | ||
| 179 | return stream.Context().Err() | ||
| 180 | |||
| 181 | case <-sub.Done(): | ||
| 182 | return nil | ||
| 183 | } | ||
| 184 | } | ||
| 124 | } | 185 | } |
| 125 | 186 | ||
| 126 | func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { | 187 | func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) { |
| 127 | return nil, fmt.Errorf("not implemented yet") | 188 | s.subs.Remove(req.SubscriptionId) |
| 189 | return &pb.Empty{}, nil | ||
| 190 | } | ||
| 191 | |||
| 192 | func generateSubID() string { | ||
| 193 | b := make([]byte, 8) | ||
| 194 | rand.Read(b) | ||
| 195 | return fmt.Sprintf("%x", b) | ||
| 128 | } | 196 | } |
