summaryrefslogtreecommitdiffstats
path: root/internal/handler
diff options
context:
space:
mode:
Diffstat (limited to 'internal/handler')
-rw-r--r--internal/handler/grpc/server.go4
-rw-r--r--internal/handler/websocket/convert.go76
-rw-r--r--internal/handler/websocket/handler.go246
3 files changed, 326 insertions, 0 deletions
diff --git a/internal/handler/grpc/server.go b/internal/handler/grpc/server.go
index 74857f6..b65b527 100644
--- a/internal/handler/grpc/server.go
+++ b/internal/handler/grpc/server.go
@@ -28,6 +28,10 @@ func NewServer(store EventStore) *Server {
28 } 28 }
29} 29}
30 30
31func (s *Server) SetSubscriptionManager(mgr *subscription.Manager) {
32 s.subs = mgr
33}
34
31func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) { 35func (s *Server) PublishEvent(ctx context.Context, req *pb.PublishEventRequest) (*pb.PublishEventResponse, error) {
32 if req.Event == nil { 36 if req.Event == nil {
33 return &pb.PublishEventResponse{ 37 return &pb.PublishEventResponse{
diff --git a/internal/handler/websocket/convert.go b/internal/handler/websocket/convert.go
new file mode 100644
index 0000000..0458ee4
--- /dev/null
+++ b/internal/handler/websocket/convert.go
@@ -0,0 +1,76 @@
1package websocket
2
3import (
4 pb "northwest.io/nostr-grpc/api/nostr/v1"
5 "northwest.io/nostr-grpc/internal/nostr"
6)
7
8func NostrToPB(n *nostr.Event) *pb.Event {
9 tags := make([]*pb.Tag, len(n.Tags))
10 for i, tag := range n.Tags {
11 tags[i] = &pb.Tag{Values: tag}
12 }
13
14 return &pb.Event{
15 Id: n.ID,
16 Pubkey: n.PubKey,
17 CreatedAt: n.CreatedAt,
18 Kind: int32(n.Kind),
19 Tags: tags,
20 Content: n.Content,
21 Sig: n.Sig,
22 }
23}
24
25func PBToNostr(e *pb.Event) *nostr.Event {
26 tags := make(nostr.Tags, len(e.Tags))
27 for i, tag := range e.Tags {
28 tags[i] = tag.Values
29 }
30
31 return &nostr.Event{
32 ID: e.Id,
33 PubKey: e.Pubkey,
34 CreatedAt: e.CreatedAt,
35 Kind: int(e.Kind),
36 Tags: tags,
37 Content: e.Content,
38 Sig: e.Sig,
39 }
40}
41
42func NostrFilterToPB(f *nostr.Filter) *pb.Filter {
43 pbFilter := &pb.Filter{
44 Ids: f.IDs,
45 Authors: f.Authors,
46 Kinds: make([]int32, len(f.Kinds)),
47 }
48
49 for i, kind := range f.Kinds {
50 pbFilter.Kinds[i] = int32(kind)
51 }
52
53 if f.Since != nil {
54 since := int64(*f.Since)
55 pbFilter.Since = &since
56 }
57
58 if f.Until != nil {
59 until := int64(*f.Until)
60 pbFilter.Until = &until
61 }
62
63 if f.Limit > 0 {
64 limit := int32(f.Limit)
65 pbFilter.Limit = &limit
66 }
67
68 if len(f.Tags) > 0 {
69 pbFilter.TagFilters = make(map[string]*pb.TagFilter)
70 for tagName, values := range f.Tags {
71 pbFilter.TagFilters[tagName] = &pb.TagFilter{Values: values}
72 }
73 }
74
75 return pbFilter
76}
diff --git a/internal/handler/websocket/handler.go b/internal/handler/websocket/handler.go
new file mode 100644
index 0000000..cef83dd
--- /dev/null
+++ b/internal/handler/websocket/handler.go
@@ -0,0 +1,246 @@
1package websocket
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "net/http"
9
10 pb "northwest.io/nostr-grpc/api/nostr/v1"
11 "northwest.io/nostr-grpc/internal/nostr"
12 "northwest.io/nostr-grpc/internal/storage"
13 "northwest.io/nostr-grpc/internal/subscription"
14 "northwest.io/nostr-grpc/internal/websocket"
15)
16
17type EventStore interface {
18 StoreEvent(context.Context, *storage.EventData) error
19 QueryEvents(context.Context, []*pb.Filter, *storage.QueryOptions) ([]*pb.Event, error)
20}
21
22type Handler struct {
23 store EventStore
24 subs *subscription.Manager
25}
26
27func NewHandler(store EventStore, subs *subscription.Manager) *Handler {
28 return &Handler{
29 store: store,
30 subs: subs,
31 }
32}
33
34func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
35 conn, err := websocket.Accept(w, r)
36 if err != nil {
37 log.Printf("WebSocket accept failed: %v", err)
38 return
39 }
40 defer conn.Close(websocket.StatusNormalClosure, "")
41
42 ctx := r.Context()
43 clientSubs := make(map[string]*subscription.Subscription)
44 defer func() {
45 for subID := range clientSubs {
46 h.subs.Remove(subID)
47 }
48 }()
49
50 for {
51 _, data, err := conn.Read(ctx)
52 if err != nil {
53 return
54 }
55
56 if err := h.handleMessage(ctx, conn, data, clientSubs); err != nil {
57 log.Printf("Message handling error: %v", err)
58 h.sendNotice(ctx, conn, err.Error())
59 }
60 }
61}
62
63func (h *Handler) handleMessage(ctx context.Context, conn *websocket.Conn, data []byte, clientSubs map[string]*subscription.Subscription) error {
64 var raw []json.RawMessage
65 if err := json.Unmarshal(data, &raw); err != nil {
66 return fmt.Errorf("invalid JSON")
67 }
68
69 if len(raw) == 0 {
70 return fmt.Errorf("empty message")
71 }
72
73 var msgType string
74 if err := json.Unmarshal(raw[0], &msgType); err != nil {
75 return fmt.Errorf("invalid message type")
76 }
77
78 switch msgType {
79 case "EVENT":
80 return h.handleEvent(ctx, conn, raw)
81 case "REQ":
82 return h.handleReq(ctx, conn, raw, clientSubs)
83 case "CLOSE":
84 return h.handleClose(raw, clientSubs)
85 default:
86 return fmt.Errorf("unknown message type: %s", msgType)
87 }
88}
89
90func (h *Handler) handleEvent(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage) error {
91 if len(raw) != 2 {
92 return fmt.Errorf("EVENT expects 2 elements")
93 }
94
95 var event nostr.Event
96 if err := json.Unmarshal(raw[1], &event); err != nil {
97 return fmt.Errorf("invalid event: %w", err)
98 }
99
100 if !event.CheckID() {
101 h.sendOK(ctx, conn, event.ID, false, "invalid: event ID mismatch")
102 return nil
103 }
104
105 if !event.Verify() {
106 h.sendOK(ctx, conn, event.ID, false, "invalid: signature verification failed")
107 return nil
108 }
109
110 pbEvent := NostrToPB(&event)
111 canonicalJSON := event.Serialize()
112
113 eventData := &storage.EventData{
114 Event: pbEvent,
115 CanonicalJSON: canonicalJSON,
116 }
117
118 err := h.store.StoreEvent(ctx, eventData)
119 if err == storage.ErrEventExists {
120 h.sendOK(ctx, conn, event.ID, true, "duplicate: already have this event")
121 return nil
122 }
123 if err != nil {
124 h.sendOK(ctx, conn, event.ID, false, fmt.Sprintf("error: %v", err))
125 return nil
126 }
127
128 h.subs.MatchAndFan(pbEvent)
129
130 h.sendOK(ctx, conn, event.ID, true, "")
131 return nil
132}
133
134func (h *Handler) handleReq(ctx context.Context, conn *websocket.Conn, raw []json.RawMessage, clientSubs map[string]*subscription.Subscription) error {
135 if len(raw) < 3 {
136 return fmt.Errorf("REQ expects at least 3 elements")
137 }
138
139 var subID string
140 if err := json.Unmarshal(raw[1], &subID); err != nil {
141 return fmt.Errorf("invalid subscription ID")
142 }
143
144 var filters []*pb.Filter
145 for i := 2; i < len(raw); i++ {
146 var nostrFilter nostr.Filter
147 if err := json.Unmarshal(raw[i], &nostrFilter); err != nil {
148 return fmt.Errorf("invalid filter: %w", err)
149 }
150
151 pbFilter := NostrFilterToPB(&nostrFilter)
152 filters = append(filters, pbFilter)
153 }
154
155 if existing, ok := clientSubs[subID]; ok {
156 h.subs.Remove(existing.ID)
157 delete(clientSubs, subID)
158 }
159
160 storedEvents, err := h.store.QueryEvents(ctx, filters, &storage.QueryOptions{Limit: 0})
161 if err != nil {
162 return fmt.Errorf("query failed: %w", err)
163 }
164
165 for _, pbEvent := range storedEvents {
166 event := PBToNostr(pbEvent)
167 h.sendEvent(ctx, conn, subID, event)
168 }
169
170 h.sendEOSE(ctx, conn, subID)
171
172 sub := &subscription.Subscription{
173 ID: subID,
174 Filters: filters,
175 Events: make(chan *pb.Event, 100),
176 }
177 sub.InitDone()
178
179 h.subs.Add(sub)
180 clientSubs[subID] = sub
181
182 go h.streamEvents(ctx, conn, sub)
183
184 return nil
185}
186
187func (h *Handler) handleClose(raw []json.RawMessage, clientSubs map[string]*subscription.Subscription) error {
188 if len(raw) != 2 {
189 return fmt.Errorf("CLOSE expects 2 elements")
190 }
191
192 var subID string
193 if err := json.Unmarshal(raw[1], &subID); err != nil {
194 return fmt.Errorf("invalid subscription ID")
195 }
196
197 if sub, ok := clientSubs[subID]; ok {
198 h.subs.Remove(sub.ID)
199 delete(clientSubs, subID)
200 }
201
202 return nil
203}
204
205func (h *Handler) streamEvents(ctx context.Context, conn *websocket.Conn, sub *subscription.Subscription) {
206 for {
207 select {
208 case pbEvent, ok := <-sub.Events:
209 if !ok {
210 return
211 }
212 event := PBToNostr(pbEvent)
213 h.sendEvent(ctx, conn, sub.ID, event)
214
215 case <-ctx.Done():
216 return
217
218 case <-sub.Done():
219 return
220 }
221 }
222}
223
224func (h *Handler) sendEvent(ctx context.Context, conn *websocket.Conn, subID string, event *nostr.Event) error {
225 msg := []interface{}{"EVENT", subID, event}
226 data, _ := json.Marshal(msg)
227 return conn.Write(ctx, websocket.MessageText, data)
228}
229
230func (h *Handler) sendOK(ctx context.Context, conn *websocket.Conn, eventID string, accepted bool, message string) error {
231 msg := []interface{}{"OK", eventID, accepted, message}
232 data, _ := json.Marshal(msg)
233 return conn.Write(ctx, websocket.MessageText, data)
234}
235
236func (h *Handler) sendEOSE(ctx context.Context, conn *websocket.Conn, subID string) error {
237 msg := []interface{}{"EOSE", subID}
238 data, _ := json.Marshal(msg)
239 return conn.Write(ctx, websocket.MessageText, data)
240}
241
242func (h *Handler) sendNotice(ctx context.Context, conn *websocket.Conn, notice string) error {
243 msg := []interface{}{"NOTICE", notice}
244 data, _ := json.Marshal(msg)
245 return conn.Write(ctx, websocket.MessageText, data)
246}