1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
package grpc
import (
"context"
"fmt"
pb "northwest.io/nostr-grpc/api/nostr/v1"
"northwest.io/nostr-grpc/internal/storage"
)
type EventStore interface {
StoreEvent(context.Context, *storage.EventData) error
QueryEvents(context.Context, []*pb.Filter, *storage.QueryOptions) ([]*pb.Event, error)
}
type Server struct {
pb.UnimplementedNostrRelayServer
store EventStore
}
func NewServer(store EventStore) *Server {
return &Server{store: store}
}
func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) {
if req.Event == nil {
return &pb.PublishEventResponse{
Accepted: false,
Message: "event is required",
}, nil
}
nostrEvent := PBToNostr(req.Event)
if !nostrEvent.CheckID() {
return &pb.PublishEventResponse{
Accepted: false,
Message: "invalid event ID",
}, nil
}
if !nostrEvent.Verify() {
return &pb.PublishEventResponse{
Accepted: false,
Message: "invalid signature",
}, nil
}
canonicalJSON := nostrEvent.Serialize()
eventData := &storage.EventData{
Event: req.Event,
CanonicalJSON: canonicalJSON,
}
err := s.store.StoreEvent(ctx, eventData)
if err == storage.ErrEventExists {
return &pb.PublishEventResponse{
Accepted: false,
Message: "duplicate: event already exists",
CanonicalJson: canonicalJSON,
}, nil
}
if err != nil {
return nil, fmt.Errorf("failed to store event: %w", err)
}
return &pb.PublishEventResponse{
Accepted: true,
Message: "success",
CanonicalJson: canonicalJSON,
}, nil
}
func (s *Server) QueryEvents(ctx context.Context, req *pb.QueryRequest) (*pb.QueryResponse, error) {
opts := &storage.QueryOptions{
IncludeCanonical: req.IncludeCanonicalJson,
Limit: req.PageSize,
}
if opts.Limit == 0 {
opts.Limit = 100
}
events, err := s.store.QueryEvents(ctx, req.Filters, opts)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
return &pb.QueryResponse{
Events: events,
}, nil
}
func (s *Server) CountEvents(ctx context.Context, req *pb.CountRequest) (*pb.CountResponse, error) {
events, err := s.store.QueryEvents(ctx, req.Filters, &storage.QueryOptions{Limit: 0})
if err != nil {
return nil, fmt.Errorf("count failed: %w", err)
}
return &pb.CountResponse{
Count: int64(len(events)),
}, nil
}
func (s *Server) PublishBatch(ctx context.Context, req *pb.PublishBatchRequest) (*pb.PublishBatchResponse, error) {
results := make([]*pb.PublishEventResponse, len(req.Events))
for i, event := range req.Events {
resp, err := s.PublishEvent(ctx, &pb.PublishEventRequest{Event: event})
if err != nil {
return nil, err
}
results[i] = resp
}
return &pb.PublishBatchResponse{
Results: results,
}, nil
}
func (s *Server) Subscribe(req *pb.SubscribeRequest, stream pb.NostrRelay_SubscribeServer) error {
return fmt.Errorf("not implemented yet")
}
func (s *Server) Unsubscribe(ctx context.Context, req *pb.UnsubscribeRequest) (*pb.Empty, error) {
return nil, fmt.Errorf("not implemented yet")
}
|