aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-03-09 08:01:02 -0700
committerbndw <ben@bdw.to>2026-03-09 08:01:02 -0700
commit61a85baf87d89fcc09f9469a113a2ddc982b0a24 (patch)
treed8359ce5cbcbb9402ba92c617c4ebd702adf33e9
parentce684848e25fed3aabdde4ffba6d2d8c40afa030 (diff)
feat: phase 2 relay implementation
Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay)
-rw-r--r--axon.go22
-rw-r--r--relay/config.go66
-rw-r--r--relay/go.mod30
-rw-r--r--relay/go.sum63
-rw-r--r--relay/handler.go411
-rw-r--r--relay/main.go72
-rw-r--r--relay/server.go118
-rw-r--r--relay/storage/events.go192
-rw-r--r--relay/storage/storage.go86
-rw-r--r--relay/subscription/manager.go317
-rw-r--r--relay/websocket/websocket.go244
11 files changed, 1621 insertions, 0 deletions
diff --git a/axon.go b/axon.go
index 51ec22d..ebe18a2 100644
--- a/axon.go
+++ b/axon.go
@@ -29,6 +29,28 @@ type Tag struct {
29 Values []string `msgpack:"values" json:"values"` 29 Values []string `msgpack:"values" json:"values"`
30} 30}
31 31
32// TagFilter selects events that have a tag with the given name and any of the
33// given values. An empty Values slice matches any value.
34type TagFilter struct {
35 Name string `msgpack:"name"`
36 Values []string `msgpack:"values"`
37}
38
39// Filter selects a subset of events. All non-empty fields are ANDed together;
40// multiple entries within a slice field are ORed.
41//
42// IDs and Authors support prefix matching: a []byte shorter than 32 bytes
43// matches any event whose ID (or pubkey) starts with those bytes.
44type Filter struct {
45 IDs [][]byte `msgpack:"ids"`
46 Authors [][]byte `msgpack:"authors"`
47 Kinds []uint16 `msgpack:"kinds"`
48 Since int64 `msgpack:"since"` // inclusive lower bound on created_at
49 Until int64 `msgpack:"until"` // inclusive upper bound on created_at
50 Limit int32 `msgpack:"limit"` // max events to return (0 = no limit)
51 Tags []TagFilter `msgpack:"tags"`
52}
53
32// Event is the core Axon data structure. All fields use their wire types. 54// Event is the core Axon data structure. All fields use their wire types.
33// id, pubkey and sig are raw 32/64-byte slices, not hex. 55// id, pubkey and sig are raw 32/64-byte slices, not hex.
34// content is opaque bytes (msgpack bin type). 56// content is opaque bytes (msgpack bin type).
diff --git a/relay/config.go b/relay/config.go
new file mode 100644
index 0000000..e432b85
--- /dev/null
+++ b/relay/config.go
@@ -0,0 +1,66 @@
1package main
2
3import (
4 "encoding/hex"
5 "fmt"
6 "os"
7
8 "gopkg.in/yaml.v3"
9)
10
11// Config holds all relay configuration loaded from config.yaml.
12type Config struct {
13 Addr string `yaml:"addr"`
14 DB string `yaml:"db"`
15 RelayURL string `yaml:"relay_url"`
16 Allowlist []string `yaml:"allowlist"` // hex-encoded pubkeys
17}
18
19// DefaultConfig returns sensible defaults.
20func DefaultConfig() Config {
21 return Config{
22 Addr: ":8080",
23 DB: "axon.db",
24 RelayURL: "ws://localhost:8080",
25 }
26}
27
28// AllowlistBytes decodes the hex pubkeys in c.Allowlist and returns them as
29// raw byte slices. Returns an error if any entry is not valid 64-char hex.
30func (c *Config) AllowlistBytes() ([][]byte, error) {
31 out := make([][]byte, 0, len(c.Allowlist))
32 for _, h := range c.Allowlist {
33 b, err := hex.DecodeString(h)
34 if err != nil {
35 return nil, fmt.Errorf("config: allowlist entry %q is not valid hex: %w", h, err)
36 }
37 if len(b) != 32 {
38 return nil, fmt.Errorf("config: allowlist entry %q decoded to %d bytes, want 32", h, len(b))
39 }
40 out = append(out, b)
41 }
42 return out, nil
43}
44
45// LoadConfig reads and parses a YAML config file. Missing fields fall back to
46// DefaultConfig values.
47func LoadConfig(path string) (Config, error) {
48 cfg := DefaultConfig()
49
50 f, err := os.Open(path)
51 if err != nil {
52 if os.IsNotExist(err) {
53 // No config file — use defaults.
54 return cfg, nil
55 }
56 return cfg, fmt.Errorf("config: open %q: %w", path, err)
57 }
58 defer f.Close()
59
60 dec := yaml.NewDecoder(f)
61 dec.KnownFields(true)
62 if err := dec.Decode(&cfg); err != nil {
63 return cfg, fmt.Errorf("config: decode %q: %w", path, err)
64 }
65 return cfg, nil
66}
diff --git a/relay/go.mod b/relay/go.mod
new file mode 100644
index 0000000..a3d424a
--- /dev/null
+++ b/relay/go.mod
@@ -0,0 +1,30 @@
1module axon/relay
2
3go 1.25.5
4
5require (
6 axon v0.0.0
7 github.com/vmihailenco/msgpack/v5 v5.4.1
8 gopkg.in/yaml.v3 v3.0.1
9 modernc.org/sqlite v1.33.1
10)
11
12require (
13 github.com/dustin/go-humanize v1.0.1 // indirect
14 github.com/google/uuid v1.6.0 // indirect
15 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
16 github.com/mattn/go-isatty v0.0.20 // indirect
17 github.com/ncruces/go-strftime v0.1.9 // indirect
18 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
19 github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
20 golang.org/x/crypto v0.48.0 // indirect
21 golang.org/x/sys v0.41.0 // indirect
22 modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
23 modernc.org/libc v1.55.3 // indirect
24 modernc.org/mathutil v1.6.0 // indirect
25 modernc.org/memory v1.8.0 // indirect
26 modernc.org/strutil v1.2.0 // indirect
27 modernc.org/token v1.1.0 // indirect
28)
29
30replace axon => ../
diff --git a/relay/go.sum b/relay/go.sum
new file mode 100644
index 0000000..04de341
--- /dev/null
+++ b/relay/go.sum
@@ -0,0 +1,63 @@
1github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
2github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
4github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
5github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
6github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw=
7github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
8github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
9github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
10github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
11github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
12github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
13github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
14github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
15github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
16github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
17github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
18github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
19github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
20github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
21github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
22github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
23github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
24github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
25golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
26golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
27golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic=
28golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
29golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
30golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
31golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
32golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
33golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
34gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
35gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
36gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
37gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
38modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ=
39modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ=
40modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y=
41modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s=
42modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
43modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ=
44modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw=
45modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU=
46modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI=
47modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4=
48modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U=
49modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w=
50modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
51modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
52modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E=
53modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU=
54modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
55modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
56modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
57modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
58modernc.org/sqlite v1.33.1 h1:trb6Z3YYoeM9eDL1O8do81kP+0ejv+YzgyFo+Gwy0nM=
59modernc.org/sqlite v1.33.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k=
60modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
61modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
62modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
63modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
diff --git a/relay/handler.go b/relay/handler.go
new file mode 100644
index 0000000..afd9622
--- /dev/null
+++ b/relay/handler.go
@@ -0,0 +1,411 @@
1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/vmihailenco/msgpack/v5"
11
12 "axon"
13 "axon/relay/storage"
14 "axon/relay/subscription"
15 ws "axon/relay/websocket"
16)
17
18// Wire protocol message type constants.
19const (
20 // Client → Relay
21 MsgTypeAuth uint16 = 1
22 MsgTypeSubscribe uint16 = 2
23 MsgTypeUnsubscribe uint16 = 3
24 MsgTypePublish uint16 = 4
25
26 // Relay → Client
27 MsgTypeChallenge uint16 = 10
28 MsgTypeEvent uint16 = 11
29 MsgTypeEose uint16 = 12
30 MsgTypeOk uint16 = 13
31 MsgTypeError uint16 = 14
32)
33
34// Payload types — Relay → Client
35
36type ChallengePayload struct {
37 Nonce []byte `msgpack:"nonce"`
38}
39
40type EventPayload struct {
41 SubID string `msgpack:"sub_id"`
42 Event axon.Event `msgpack:"event"`
43}
44
45type EosePayload struct {
46 SubID string `msgpack:"sub_id"`
47}
48
49type OkPayload struct {
50 Message string `msgpack:"message"`
51}
52
53type ErrorPayload struct {
54 Code uint16 `msgpack:"code"`
55 Message string `msgpack:"message"`
56}
57
58// Payload types — Client → Relay
59
60type AuthPayload struct {
61 PubKey []byte `msgpack:"pubkey"`
62 Sig []byte `msgpack:"sig"`
63}
64
65type SubscribePayload struct {
66 SubID string `msgpack:"sub_id"`
67 Filter axon.Filter `msgpack:"filter"`
68}
69
70type UnsubscribePayload struct {
71 SubID string `msgpack:"sub_id"`
72}
73
74type PublishPayload struct {
75 Event axon.Event `msgpack:"event"`
76}
77
78// conn holds per-connection state.
79type conn struct {
80 id string // unique connection ID (hex nonce for logging)
81 ws *ws.Conn
82 store *storage.Storage
83 global *subscription.GlobalManager
84 allowlist [][]byte
85 relayURL string
86
87 authed bool
88 pubkey []byte
89 nonce []byte
90
91 mgr *subscription.Manager
92}
93
94// send encodes and writes a wire message to the client.
95func (c *conn) send(msgType uint16, payload interface{}) error {
96 b, err := msgpack.Marshal([]interface{}{msgType, payload})
97 if err != nil {
98 return fmt.Errorf("handler: marshal msg %d: %w", msgType, err)
99 }
100 return c.ws.Write(b)
101}
102
103// sendError sends an ErrorPayload. If fatal is true the connection is then closed.
104func (c *conn) sendError(code uint16, message string, fatal bool) {
105 _ = c.send(MsgTypeError, ErrorPayload{Code: code, Message: message})
106 if fatal {
107 c.ws.Close(1000, "")
108 }
109}
110
111// serve is the main per-connection loop.
112func (c *conn) serve(ctx context.Context) {
113 defer func() {
114 c.mgr.CloseAll()
115 c.global.UnregisterConn(c.id)
116 }()
117
118 // Send challenge immediately on connect.
119 if err := c.send(MsgTypeChallenge, ChallengePayload{Nonce: c.nonce}); err != nil {
120 log.Printf("conn %s: send challenge: %v", c.id, err)
121 return
122 }
123
124 // Start ping goroutine.
125 pingStop := make(chan struct{})
126 defer close(pingStop)
127 go c.pingLoop(pingStop)
128
129 for {
130 raw, err := c.ws.Read(ctx)
131 if err != nil {
132 if ctx.Err() != nil {
133 return
134 }
135 // Connection closed or read error.
136 return
137 }
138
139 if err := c.dispatch(ctx, raw); err != nil {
140 log.Printf("conn %s: dispatch: %v", c.id, err)
141 return
142 }
143 }
144}
145
146// pingLoop sends a WebSocket ping every 30 seconds. If two consecutive pings
147// go unanswered (no pong received within 30s each) the connection is closed.
148func (c *conn) pingLoop(stop <-chan struct{}) {
149 ticker := time.NewTicker(30 * time.Second)
150 defer ticker.Stop()
151 missed := 0
152 for {
153 select {
154 case <-stop:
155 return
156 case <-ticker.C:
157 if err := c.ws.Ping(); err != nil {
158 return
159 }
160 missed++
161 if missed >= 2 {
162 log.Printf("conn %s: missed 2 pings, closing", c.id)
163 c.ws.Close(1001, "ping timeout")
164 return
165 }
166 }
167 }
168}
169
170// dispatch parses a raw message and routes it to the appropriate handler.
171func (c *conn) dispatch(ctx context.Context, raw []byte) error {
172 // Decode as [uint16, rawPayload]
173 var arr []msgpack.RawMessage
174 if err := msgpack.Unmarshal(raw, &arr); err != nil {
175 c.sendError(400, "malformed message", false)
176 return nil
177 }
178 if len(arr) < 2 {
179 c.sendError(400, "message too short", false)
180 return nil
181 }
182
183 var msgType uint16
184 if err := msgpack.Unmarshal(arr[0], &msgType); err != nil {
185 c.sendError(400, "invalid message type", false)
186 return nil
187 }
188
189 // Only MsgTypeAuth is allowed before authentication.
190 if !c.authed && msgType != MsgTypeAuth {
191 c.sendError(401, "not authenticated", true)
192 return fmt.Errorf("unauthenticated message type %d", msgType)
193 }
194
195 switch msgType {
196 case MsgTypeAuth:
197 return c.handleAuth(arr[1])
198 case MsgTypePublish:
199 return c.handlePublish(ctx, arr[1])
200 case MsgTypeSubscribe:
201 return c.handleSubscribe(ctx, arr[1])
202 case MsgTypeUnsubscribe:
203 return c.handleUnsubscribe(arr[1])
204 default:
205 c.sendError(400, fmt.Sprintf("unknown message type %d", msgType), false)
206 }
207 return nil
208}
209
210// handleAuth processes an Auth message.
211func (c *conn) handleAuth(raw msgpack.RawMessage) error {
212 var p AuthPayload
213 if err := msgpack.Unmarshal(raw, &p); err != nil {
214 c.sendError(400, "malformed auth payload", true)
215 return fmt.Errorf("unmarshal auth: %w", err)
216 }
217
218 if len(p.PubKey) != 32 {
219 c.sendError(400, "pubkey must be 32 bytes", true)
220 return fmt.Errorf("bad pubkey length %d", len(p.PubKey))
221 }
222
223 if !axon.VerifyChallenge(p.PubKey, c.nonce, c.relayURL, p.Sig) {
224 c.sendError(401, "invalid signature", true)
225 return fmt.Errorf("auth signature invalid")
226 }
227
228 // Check allowlist.
229 if len(c.allowlist) > 0 {
230 allowed := false
231 for _, pk := range c.allowlist {
232 if bytes.Equal(pk, p.PubKey) {
233 allowed = true
234 break
235 }
236 }
237 if !allowed {
238 c.sendError(403, "pubkey not in allowlist", true)
239 return fmt.Errorf("pubkey not in allowlist")
240 }
241 }
242
243 c.authed = true
244 c.pubkey = p.PubKey
245 return c.send(MsgTypeOk, OkPayload{Message: "authenticated"})
246}
247
248// handlePublish processes a Publish message.
249func (c *conn) handlePublish(ctx context.Context, raw msgpack.RawMessage) error {
250 var p PublishPayload
251 if err := msgpack.Unmarshal(raw, &p); err != nil {
252 c.sendError(400, "malformed publish payload", false)
253 return nil
254 }
255
256 event := &p.Event
257
258 // Content length check.
259 if len(event.Content) > 65536 {
260 c.sendError(413, "content exceeds 64KB limit", false)
261 return nil
262 }
263
264 // Signature verification.
265 if err := axon.Verify(event); err != nil {
266 c.sendError(400, fmt.Sprintf("invalid event: %v", err), false)
267 return nil
268 }
269
270 // Job request expiry check (kinds 5000–5999).
271 if event.Kind >= 5000 && event.Kind < 6000 {
272 if expired, err := isExpired(event); err != nil {
273 c.sendError(400, fmt.Sprintf("bad expires_at tag: %v", err), false)
274 return nil
275 } else if expired {
276 c.sendError(400, "job request has expired", false)
277 return nil
278 }
279 }
280
281 // Marshal envelope bytes for storage and fanout.
282 envelopeBytes, err := axon.MarshalEvent(event)
283 if err != nil {
284 c.sendError(400, "could not marshal event", false)
285 return nil
286 }
287
288 // Ephemeral events (3000–3999): fanout only, do not store.
289 isEphemeral := event.Kind >= 3000 && event.Kind < 4000
290
291 if !isEphemeral {
292 // Duplicate check.
293 exists, err := c.store.ExistsByID(ctx, event.ID)
294 if err != nil {
295 c.sendError(500, "internal error", false)
296 return fmt.Errorf("exists check: %w", err)
297 }
298 if exists {
299 c.sendError(409, "duplicate event", false)
300 return nil
301 }
302
303 // Persist.
304 if err := c.store.StoreEvent(ctx, event, envelopeBytes); err != nil {
305 if err == storage.ErrDuplicate {
306 c.sendError(409, "duplicate event", false)
307 return nil
308 }
309 c.sendError(500, "internal error", false)
310 return fmt.Errorf("store event: %w", err)
311 }
312 }
313
314 // Fanout to all matching subscribers.
315 c.global.Fanout(event, envelopeBytes)
316
317 return c.send(MsgTypeOk, OkPayload{Message: "ok"})
318}
319
320// handleSubscribe processes a Subscribe message.
321func (c *conn) handleSubscribe(ctx context.Context, raw msgpack.RawMessage) error {
322 var p SubscribePayload
323 if err := msgpack.Unmarshal(raw, &p); err != nil {
324 c.sendError(400, "malformed subscribe payload", false)
325 return nil
326 }
327 if p.SubID == "" {
328 c.sendError(400, "sub_id required", false)
329 return nil
330 }
331
332 // Query historical events.
333 envelopes, err := c.store.QueryEvents(ctx, []axon.Filter{p.Filter})
334 if err != nil {
335 c.sendError(500, "internal error", false)
336 return fmt.Errorf("query events: %w", err)
337 }
338
339 for _, envBytes := range envelopes {
340 ev, err := axon.UnmarshalEvent(envBytes)
341 if err != nil {
342 log.Printf("conn %s: unmarshal stored event: %v", c.id, err)
343 continue
344 }
345 if err := c.send(MsgTypeEvent, EventPayload{SubID: p.SubID, Event: *ev}); err != nil {
346 return err
347 }
348 }
349
350 // Send EOSE.
351 if err := c.send(MsgTypeEose, EosePayload{SubID: p.SubID}); err != nil {
352 return err
353 }
354
355 // Register for live fanout.
356 sub := c.mgr.Add(p.SubID, []axon.Filter{p.Filter})
357 c.global.Register(c.id, p.SubID, sub)
358
359 // Start goroutine to stream live events to this client.
360 go c.streamSub(sub)
361
362 return nil
363}
364
365// streamSub reads from a subscription's Events channel and sends them to the
366// client. Returns when the subscription or connection is closed.
367func (c *conn) streamSub(sub *subscription.Subscription) {
368 for envelopeBytes := range sub.Events {
369 ev, err := axon.UnmarshalEvent(envelopeBytes)
370 if err != nil {
371 log.Printf("conn %s: sub %s: unmarshal live event: %v", c.id, sub.ID, err)
372 continue
373 }
374 if err := c.send(MsgTypeEvent, EventPayload{SubID: sub.ID, Event: *ev}); err != nil {
375 return
376 }
377 }
378}
379
380// handleUnsubscribe processes an Unsubscribe message.
381func (c *conn) handleUnsubscribe(raw msgpack.RawMessage) error {
382 var p UnsubscribePayload
383 if err := msgpack.Unmarshal(raw, &p); err != nil {
384 c.sendError(400, "malformed unsubscribe payload", false)
385 return nil
386 }
387 c.mgr.Remove(p.SubID)
388 c.global.Unregister(c.id, p.SubID)
389 return nil
390}
391
392// isExpired checks the expires_at tag on a job request event.
393// Returns (true, nil) if the event is expired, (false, nil) if not, or
394// (false, err) if the tag is malformed.
395func isExpired(event *axon.Event) (bool, error) {
396 for _, tag := range event.Tags {
397 if tag.Name != "expires_at" {
398 continue
399 }
400 if len(tag.Values) == 0 {
401 continue
402 }
403 var ts int64
404 _, err := fmt.Sscanf(tag.Values[0], "%d", &ts)
405 if err != nil {
406 return false, fmt.Errorf("parse expires_at: %w", err)
407 }
408 return time.Now().Unix() > ts, nil
409 }
410 return false, nil
411}
diff --git a/relay/main.go b/relay/main.go
new file mode 100644
index 0000000..2cfa034
--- /dev/null
+++ b/relay/main.go
@@ -0,0 +1,72 @@
1package main
2
3import (
4 "context"
5 "errors"
6 "flag"
7 "log"
8 "net/http"
9 "os"
10 "os/signal"
11 "syscall"
12 "time"
13
14 "axon/relay/storage"
15 "axon/relay/subscription"
16)
17
18func main() {
19 cfgPath := flag.String("config", "config.yaml", "path to config.yaml")
20 flag.Parse()
21
22 cfg, err := LoadConfig(*cfgPath)
23 if err != nil {
24 log.Fatalf("relay: load config: %v", err)
25 }
26
27 allowlist, err := cfg.AllowlistBytes()
28 if err != nil {
29 log.Fatalf("relay: allowlist: %v", err)
30 }
31
32 store, err := storage.New(cfg.DB)
33 if err != nil {
34 log.Fatalf("relay: open storage: %v", err)
35 }
36 defer store.Close()
37
38 global := subscription.NewGlobalManager()
39
40 // Periodically purge closed subscriptions.
41 stopPurger := make(chan struct{})
42 global.StartPurger(5*time.Minute, stopPurger)
43 defer close(stopPurger)
44
45 srv := NewServer(cfg, allowlist, store, global)
46
47 // Graceful shutdown on SIGINT / SIGTERM.
48 sigCh := make(chan os.Signal, 1)
49 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
50
51 errCh := make(chan error, 1)
52 go func() {
53 if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) {
54 errCh <- err
55 }
56 }()
57
58 select {
59 case sig := <-sigCh:
60 log.Printf("relay: received signal %s, shutting down", sig)
61 case err := <-errCh:
62 log.Fatalf("relay: server error: %v", err)
63 }
64
65 shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
66 defer cancel()
67
68 if err := srv.Shutdown(shutdownCtx); err != nil {
69 log.Printf("relay: shutdown error: %v", err)
70 }
71 log.Println("relay: stopped")
72}
diff --git a/relay/server.go b/relay/server.go
new file mode 100644
index 0000000..085929c
--- /dev/null
+++ b/relay/server.go
@@ -0,0 +1,118 @@
1package main
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "fmt"
8 "log"
9 "net/http"
10 "sync"
11
12 "axon/relay/storage"
13 "axon/relay/subscription"
14 ws "axon/relay/websocket"
15)
16
17// Server is the HTTP + WebSocket server for the Axon relay.
18type Server struct {
19 cfg Config
20 allowlist [][]byte
21 store *storage.Storage
22 global *subscription.GlobalManager
23
24 mu sync.WaitGroup
25 httpSrv *http.Server
26}
27
28// NewServer creates a Server from the given config.
29func NewServer(cfg Config, allowlist [][]byte, store *storage.Storage, global *subscription.GlobalManager) *Server {
30 return &Server{
31 cfg: cfg,
32 allowlist: allowlist,
33 store: store,
34 global: global,
35 }
36}
37
38// Start configures the HTTP server and starts listening. Call Shutdown to stop.
39func (s *Server) Start() error {
40 mux := http.NewServeMux()
41 mux.HandleFunc("/", s.handleWS)
42
43 s.httpSrv = &http.Server{
44 Addr: s.cfg.Addr,
45 Handler: mux,
46 }
47
48 log.Printf("relay: listening on %s", s.cfg.Addr)
49 return s.httpSrv.ListenAndServe()
50}
51
52// Shutdown gracefully stops the server and waits for all connections to drain.
53func (s *Server) Shutdown(ctx context.Context) error {
54 err := s.httpSrv.Shutdown(ctx)
55 // Wait for all handler goroutines to finish.
56 done := make(chan struct{})
57 go func() {
58 s.mu.Wait()
59 close(done)
60 }()
61 select {
62 case <-done:
63 case <-ctx.Done():
64 }
65 return err
66}
67
68// handleWS upgrades an HTTP request to a WebSocket connection and starts the
69// per-connection handler goroutine.
70func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) {
71 c, err := ws.Accept(w, r)
72 if err != nil {
73 http.Error(w, "WebSocket upgrade failed", http.StatusBadRequest)
74 return
75 }
76
77 // Generate 32-byte nonce for the auth challenge.
78 nonce := make([]byte, 32)
79 if _, err := rand.Read(nonce); err != nil {
80 log.Printf("relay: generate nonce: %v", err)
81 c.CloseConn()
82 return
83 }
84
85 connID := hex.EncodeToString(nonce[:8])
86
87 h := &conn{
88 id: connID,
89 ws: c,
90 store: s.store,
91 global: s.global,
92 allowlist: s.allowlist,
93 relayURL: s.cfg.RelayURL,
94 nonce: nonce,
95 mgr: subscription.NewManager(),
96 }
97
98 s.mu.Add(1)
99 go func() {
100 defer s.mu.Done()
101 ctx := r.Context()
102 h.serve(ctx)
103 if err := c.CloseConn(); err != nil {
104 // Ignore close errors — connection may already be gone.
105 _ = err
106 }
107 log.Printf("conn %s: closed", connID)
108 }()
109}
110
111// generateConnID creates a unique connection identifier for logging.
112func generateConnID() (string, error) {
113 var b [8]byte
114 if _, err := rand.Read(b[:]); err != nil {
115 return "", fmt.Errorf("server: generate conn id: %w", err)
116 }
117 return hex.EncodeToString(b[:]), nil
118}
diff --git a/relay/storage/events.go b/relay/storage/events.go
new file mode 100644
index 0000000..cf10097
--- /dev/null
+++ b/relay/storage/events.go
@@ -0,0 +1,192 @@
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "strings"
9
10 "axon"
11)
12
13// ErrDuplicate is returned by StoreEvent when the event ID already exists.
14var ErrDuplicate = errors.New("storage: duplicate event")
15
16// StoreEvent persists an event and its tags to the database in a single
17// transaction. envelopeBytes is the verbatim msgpack representation used for
18// zero-copy fanout.
19func (s *Storage) StoreEvent(ctx context.Context, event *axon.Event, envelopeBytes []byte) error {
20 tx, err := s.db.BeginTx(ctx, nil)
21 if err != nil {
22 return fmt.Errorf("storage: begin tx: %w", err)
23 }
24 defer tx.Rollback()
25
26 _, err = tx.ExecContext(ctx,
27 `INSERT INTO events (id, pubkey, created_at, kind, envelope_bytes) VALUES (?, ?, ?, ?, ?)`,
28 event.ID, event.PubKey, event.CreatedAt, event.Kind, envelopeBytes,
29 )
30 if err != nil {
31 if isDuplicateError(err) {
32 return ErrDuplicate
33 }
34 return fmt.Errorf("storage: insert event: %w", err)
35 }
36
37 for _, tag := range event.Tags {
38 if len(tag.Values) == 0 {
39 continue
40 }
41 _, err = tx.ExecContext(ctx,
42 `INSERT INTO tags (event_id, name, value) VALUES (?, ?, ?)`,
43 event.ID, tag.Name, tag.Values[0],
44 )
45 if err != nil {
46 return fmt.Errorf("storage: insert tag: %w", err)
47 }
48 }
49
50 return tx.Commit()
51}
52
53// ExistsByID returns true if an event with the given ID is already stored.
54func (s *Storage) ExistsByID(ctx context.Context, id []byte) (bool, error) {
55 var n int
56 err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events WHERE id = ?`, id).Scan(&n)
57 if err != nil && err != sql.ErrNoRows {
58 return false, fmt.Errorf("storage: exists: %w", err)
59 }
60 return n > 0, nil
61}
62
63// QueryEvents executes the given filters against the database using a UNION
64// query and returns matching event envelope bytes in descending created_at
65// order. The effective LIMIT is the minimum non-zero Limit across all filters.
66func (s *Storage) QueryEvents(ctx context.Context, filters []axon.Filter) ([][]byte, error) {
67 if len(filters) == 0 {
68 return nil, nil
69 }
70
71 var unions []string
72 var args []interface{}
73 var effectiveLimit int32
74
75 for _, f := range filters {
76 var filterArgs []interface{}
77 clause := buildWhereClause(f, &filterArgs)
78 sub := fmt.Sprintf(
79 "SELECT e.envelope_bytes, e.created_at FROM events e WHERE %s", clause)
80 unions = append(unions, sub)
81 args = append(args, filterArgs...)
82 if f.Limit > 0 && (effectiveLimit == 0 || f.Limit < effectiveLimit) {
83 effectiveLimit = f.Limit
84 }
85 }
86
87 query := strings.Join(unions, " UNION ") + " ORDER BY created_at DESC"
88 if effectiveLimit > 0 {
89 query += fmt.Sprintf(" LIMIT %d", effectiveLimit)
90 }
91
92 rows, err := s.db.QueryContext(ctx, query, args...)
93 if err != nil {
94 return nil, fmt.Errorf("storage: query: %w", err)
95 }
96 defer rows.Close()
97
98 var results [][]byte
99 for rows.Next() {
100 var envelope []byte
101 var createdAt int64
102 if err := rows.Scan(&envelope, &createdAt); err != nil {
103 return nil, fmt.Errorf("storage: scan: %w", err)
104 }
105 results = append(results, envelope)
106 }
107 return results, rows.Err()
108}
109
110// buildWhereClause builds the SQL WHERE clause for a single filter, appending
111// bind parameters to args.
112func buildWhereClause(f axon.Filter, args *[]interface{}) string {
113 var conditions []string
114
115 if len(f.IDs) > 0 {
116 conditions = append(conditions, buildBlobPrefixCondition("e.id", f.IDs, args))
117 }
118
119 if len(f.Authors) > 0 {
120 conditions = append(conditions, buildBlobPrefixCondition("e.pubkey", f.Authors, args))
121 }
122
123 if len(f.Kinds) > 0 {
124 placeholders := make([]string, len(f.Kinds))
125 for i, k := range f.Kinds {
126 placeholders[i] = "?"
127 *args = append(*args, k)
128 }
129 conditions = append(conditions, "e.kind IN ("+strings.Join(placeholders, ",")+")")
130 }
131
132 if f.Since != 0 {
133 conditions = append(conditions, "e.created_at >= ?")
134 *args = append(*args, f.Since)
135 }
136
137 if f.Until != 0 {
138 conditions = append(conditions, "e.created_at <= ?")
139 *args = append(*args, f.Until)
140 }
141
142 for _, tf := range f.Tags {
143 conditions = append(conditions, buildTagJoinCondition(tf, args))
144 }
145
146 if len(conditions) == 0 {
147 return "1=1"
148 }
149 return strings.Join(conditions, " AND ")
150}
151
152// buildBlobPrefixCondition builds an OR condition for prefix-matching a BLOB
153// column. Prefix slices of exactly 32 bytes use equality; shorter slices use
154// hex(column) LIKE 'HEX%'.
155func buildBlobPrefixCondition(column string, prefixes [][]byte, args *[]interface{}) string {
156 var orConds []string
157 for _, prefix := range prefixes {
158 if len(prefix) == 32 {
159 orConds = append(orConds, column+" = ?")
160 *args = append(*args, prefix)
161 } else {
162 hexPrefix := fmt.Sprintf("%X", prefix)
163 orConds = append(orConds, fmt.Sprintf("hex(%s) LIKE ?", column))
164 *args = append(*args, hexPrefix+"%")
165 }
166 }
167 if len(orConds) == 1 {
168 return orConds[0]
169 }
170 return "(" + strings.Join(orConds, " OR ") + ")"
171}
172
173// buildTagJoinCondition builds an EXISTS sub-select for a TagFilter.
174func buildTagJoinCondition(tf axon.TagFilter, args *[]interface{}) string {
175 if len(tf.Values) == 0 {
176 *args = append(*args, tf.Name)
177 return "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ?)"
178 }
179 var orConds []string
180 for _, v := range tf.Values {
181 orConds = append(orConds, "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ? AND t.value = ?)")
182 *args = append(*args, tf.Name, v)
183 }
184 if len(orConds) == 1 {
185 return orConds[0]
186 }
187 return "(" + strings.Join(orConds, " OR ") + ")"
188}
189
190func isDuplicateError(err error) bool {
191 return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed")
192}
diff --git a/relay/storage/storage.go b/relay/storage/storage.go
new file mode 100644
index 0000000..95b278d
--- /dev/null
+++ b/relay/storage/storage.go
@@ -0,0 +1,86 @@
1// Package storage provides SQLite-backed event persistence for the Axon relay.
2package storage
3
4import (
5 "context"
6 "database/sql"
7 "fmt"
8
9 _ "modernc.org/sqlite"
10)
11
12// Storage wraps a SQLite database for Axon event persistence.
13type Storage struct {
14 db *sql.DB
15}
16
17// New opens (or creates) the SQLite database at dbPath, applies WAL pragmas,
18// and initialises the schema. Call Close when done.
19func New(dbPath string) (*Storage, error) {
20 db, err := sql.Open("sqlite", dbPath)
21 if err != nil {
22 return nil, fmt.Errorf("storage: open db: %w", err)
23 }
24
25 // SQLite works best with a single writer.
26 db.SetMaxOpenConns(1)
27 db.SetMaxIdleConns(1)
28 db.SetConnMaxLifetime(0)
29
30 pragmas := []string{
31 "PRAGMA journal_mode=WAL",
32 "PRAGMA synchronous=NORMAL",
33 "PRAGMA cache_size=-40960", // ~40 MB (negative = kibibytes)
34 "PRAGMA temp_store=MEMORY",
35 "PRAGMA mmap_size=268435456", // 256 MB
36 "PRAGMA page_size=4096",
37 "PRAGMA foreign_keys=ON",
38 "PRAGMA busy_timeout=5000",
39 }
40
41 for _, p := range pragmas {
42 if _, err := db.Exec(p); err != nil {
43 db.Close()
44 return nil, fmt.Errorf("storage: set pragma %q: %w", p, err)
45 }
46 }
47
48 s := &Storage{db: db}
49 if err := s.initSchema(context.Background()); err != nil {
50 db.Close()
51 return nil, fmt.Errorf("storage: init schema: %w", err)
52 }
53 return s, nil
54}
55
56// Close closes the underlying database connection.
57func (s *Storage) Close() error {
58 return s.db.Close()
59}
60
61const schema = `
62CREATE TABLE IF NOT EXISTS events (
63 id BLOB PRIMARY KEY,
64 pubkey BLOB NOT NULL,
65 created_at INTEGER NOT NULL,
66 kind INTEGER NOT NULL,
67 envelope_bytes BLOB NOT NULL
68) STRICT;
69
70CREATE TABLE IF NOT EXISTS tags (
71 event_id BLOB NOT NULL REFERENCES events(id),
72 name TEXT NOT NULL,
73 value TEXT NOT NULL
74) STRICT;
75
76CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey, created_at DESC);
77CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind, created_at DESC);
78CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at DESC);
79CREATE INDEX IF NOT EXISTS idx_tags_name_value ON tags(name, value);
80CREATE INDEX IF NOT EXISTS idx_tags_event_id ON tags(event_id);
81`
82
83func (s *Storage) initSchema(ctx context.Context) error {
84 _, err := s.db.ExecContext(ctx, schema)
85 return err
86}
diff --git a/relay/subscription/manager.go b/relay/subscription/manager.go
new file mode 100644
index 0000000..d8ba653
--- /dev/null
+++ b/relay/subscription/manager.go
@@ -0,0 +1,317 @@
1// Package subscription manages in-memory subscriptions and live event fanout.
2package subscription
3
4import (
5 "bytes"
6 "sync"
7 "time"
8
9 "axon"
10)
11
12// Subscription holds a single client subscription: an ID, a set of filters,
13// and a buffered channel for live event delivery.
14type Subscription struct {
15 ID string
16 Filters []axon.Filter
17 // Events carries raw msgpack envelope bytes for matched events.
18 Events chan []byte
19 done chan struct{}
20 once sync.Once
21}
22
23// newSubscription creates a subscription with a buffered event channel.
24func newSubscription(id string, filters []axon.Filter) *Subscription {
25 return &Subscription{
26 ID: id,
27 Filters: filters,
28 Events: make(chan []byte, 100),
29 done: make(chan struct{}),
30 }
31}
32
33// Close shuts down the subscription and drains the event channel.
34func (s *Subscription) Close() {
35 s.once.Do(func() {
36 close(s.done)
37 close(s.Events)
38 })
39}
40
41// IsClosed reports whether the subscription has been closed.
42func (s *Subscription) IsClosed() bool {
43 select {
44 case <-s.done:
45 return true
46 default:
47 return false
48 }
49}
50
51// Done returns a channel that is closed when the subscription is closed.
52func (s *Subscription) Done() <-chan struct{} {
53 return s.done
54}
55
56// Manager maintains all active subscriptions for a single connection and fans
57// out incoming events to matching subscribers.
58type Manager struct {
59 mu sync.RWMutex
60 subs map[string]*Subscription
61}
62
63// NewManager returns an empty Manager.
64func NewManager() *Manager {
65 return &Manager{subs: make(map[string]*Subscription)}
66}
67
68// Add registers a new subscription, replacing any existing subscription with
69// the same ID.
70func (m *Manager) Add(id string, filters []axon.Filter) *Subscription {
71 m.mu.Lock()
72 defer m.mu.Unlock()
73
74 // Remove old subscription with same ID if present.
75 if old, ok := m.subs[id]; ok {
76 old.Close()
77 }
78
79 sub := newSubscription(id, filters)
80 m.subs[id] = sub
81 return sub
82}
83
84// Remove cancels and deletes a subscription by ID.
85func (m *Manager) Remove(id string) {
86 m.mu.Lock()
87 defer m.mu.Unlock()
88
89 if sub, ok := m.subs[id]; ok {
90 sub.Close()
91 delete(m.subs, id)
92 }
93}
94
95// CloseAll cancels every subscription held by this manager.
96func (m *Manager) CloseAll() {
97 m.mu.Lock()
98 defer m.mu.Unlock()
99
100 for id, sub := range m.subs {
101 sub.Close()
102 delete(m.subs, id)
103 }
104}
105
106// Fanout delivers envelopeBytes to all subscriptions whose filters match event.
107// The send is non-blocking: if the channel is full the event is dropped for
108// that subscriber.
109func (m *Manager) Fanout(event *axon.Event, envelopeBytes []byte) {
110 m.mu.RLock()
111 defer m.mu.RUnlock()
112
113 for _, sub := range m.subs {
114 if sub.IsClosed() {
115 continue
116 }
117 if MatchesAnyFilter(event, sub.Filters) {
118 select {
119 case sub.Events <- envelopeBytes:
120 case <-sub.done:
121 default:
122 // channel full — drop
123 }
124 }
125 }
126}
127
128// MatchesAnyFilter returns true if event matches at least one filter in filters.
129func MatchesAnyFilter(event *axon.Event, filters []axon.Filter) bool {
130 for i := range filters {
131 if MatchesFilter(event, &filters[i]) {
132 return true
133 }
134 }
135 return false
136}
137
138// MatchesFilter returns true if event satisfies all constraints in f.
139func MatchesFilter(event *axon.Event, f *axon.Filter) bool {
140 if len(f.IDs) > 0 {
141 if !matchesBytesPrefixes(event.ID, f.IDs) {
142 return false
143 }
144 }
145
146 if len(f.Authors) > 0 {
147 if !matchesBytesPrefixes(event.PubKey, f.Authors) {
148 return false
149 }
150 }
151
152 if len(f.Kinds) > 0 {
153 found := false
154 for _, k := range f.Kinds {
155 if event.Kind == k {
156 found = true
157 break
158 }
159 }
160 if !found {
161 return false
162 }
163 }
164
165 if f.Since != 0 && event.CreatedAt < f.Since {
166 return false
167 }
168
169 if f.Until != 0 && event.CreatedAt > f.Until {
170 return false
171 }
172
173 for _, tf := range f.Tags {
174 if !eventHasTagMatch(event, tf.Name, tf.Values) {
175 return false
176 }
177 }
178
179 return true
180}
181
182// matchesBytesPrefixes returns true if value has any of the given byte slices
183// as a prefix. A prefix of exactly 32 bytes must match exactly.
184func matchesBytesPrefixes(value []byte, prefixes [][]byte) bool {
185 for _, prefix := range prefixes {
186 if len(prefix) == 0 {
187 return true
188 }
189 if len(prefix) >= len(value) {
190 if bytes.Equal(value, prefix) {
191 return true
192 }
193 } else {
194 if bytes.HasPrefix(value, prefix) {
195 return true
196 }
197 }
198 }
199 return false
200}
201
202// eventHasTagMatch returns true if event has a tag named name whose first
203// value matches any of values.
204func eventHasTagMatch(event *axon.Event, name string, values []string) bool {
205 for _, tag := range event.Tags {
206 if tag.Name != name {
207 continue
208 }
209 if len(values) == 0 {
210 return true
211 }
212 if len(tag.Values) == 0 {
213 continue
214 }
215 for _, v := range values {
216 if tag.Values[0] == v {
217 return true
218 }
219 }
220 }
221 return false
222}
223
224// GlobalManager is a relay-wide manager that holds subscriptions from all
225// connections and supports cross-connection fanout.
226type GlobalManager struct {
227 mu sync.RWMutex
228 subs map[string]*Subscription // key: "connID:subID"
229}
230
231// NewGlobalManager returns an empty GlobalManager.
232func NewGlobalManager() *GlobalManager {
233 return &GlobalManager{subs: make(map[string]*Subscription)}
234}
235
236// Register adds a subscription under a globally unique key.
237func (g *GlobalManager) Register(connID, subID string, sub *Subscription) {
238 key := connID + ":" + subID
239 g.mu.Lock()
240 defer g.mu.Unlock()
241 if old, ok := g.subs[key]; ok {
242 old.Close()
243 }
244 g.subs[key] = sub
245}
246
247// Unregister removes a subscription.
248func (g *GlobalManager) Unregister(connID, subID string) {
249 key := connID + ":" + subID
250 g.mu.Lock()
251 defer g.mu.Unlock()
252 if sub, ok := g.subs[key]; ok {
253 sub.Close()
254 delete(g.subs, key)
255 }
256}
257
258// UnregisterConn removes all subscriptions for a connection.
259func (g *GlobalManager) UnregisterConn(connID string) {
260 prefix := connID + ":"
261 g.mu.Lock()
262 defer g.mu.Unlock()
263 for key, sub := range g.subs {
264 if len(key) > len(prefix) && key[:len(prefix)] == prefix {
265 sub.Close()
266 delete(g.subs, key)
267 }
268 }
269}
270
271// Fanout delivers the event to all matching subscriptions across all connections.
272func (g *GlobalManager) Fanout(event *axon.Event, envelopeBytes []byte) {
273 g.mu.RLock()
274 defer g.mu.RUnlock()
275
276 for _, sub := range g.subs {
277 if sub.IsClosed() {
278 continue
279 }
280 if MatchesAnyFilter(event, sub.Filters) {
281 select {
282 case sub.Events <- envelopeBytes:
283 case <-sub.done:
284 default:
285 }
286 }
287 }
288}
289
290// PurgeExpired removes closed subscriptions from the global map.
291// Call periodically to prevent unbounded growth.
292func (g *GlobalManager) PurgeExpired() {
293 g.mu.Lock()
294 defer g.mu.Unlock()
295 for key, sub := range g.subs {
296 if sub.IsClosed() {
297 delete(g.subs, key)
298 }
299 }
300}
301
302// StartPurger launches a background goroutine that periodically removes closed
303// subscriptions.
304func (g *GlobalManager) StartPurger(interval time.Duration, stop <-chan struct{}) {
305 go func() {
306 ticker := time.NewTicker(interval)
307 defer ticker.Stop()
308 for {
309 select {
310 case <-ticker.C:
311 g.PurgeExpired()
312 case <-stop:
313 return
314 }
315 }
316 }()
317}
diff --git a/relay/websocket/websocket.go b/relay/websocket/websocket.go
new file mode 100644
index 0000000..cfc3289
--- /dev/null
+++ b/relay/websocket/websocket.go
@@ -0,0 +1,244 @@
1// Package websocket implements RFC 6455 WebSocket framing without external dependencies.
2// Adapted from muxstr's websocket implementation.
3package websocket
4
5import (
6 "bufio"
7 "context"
8 "crypto/rand"
9 "crypto/sha1"
10 "encoding/base64"
11 "encoding/binary"
12 "fmt"
13 "io"
14 "net"
15 "net/http"
16 "strings"
17 "sync"
18 "time"
19)
20
21const (
22 opContinuation = 0x0
23 opBinary = 0x2
24 opClose = 0x8
25 opPing = 0x9
26 opPong = 0xA
27)
28
29// Conn is a WebSocket connection.
30type Conn struct {
31 rwc net.Conn
32 br *bufio.Reader
33 client bool
34 mu sync.Mutex
35}
36
37func mask(key [4]byte, data []byte) {
38 for i := range data {
39 data[i] ^= key[i%4]
40 }
41}
42
43func (c *Conn) writeFrame(opcode byte, payload []byte) error {
44 c.mu.Lock()
45 defer c.mu.Unlock()
46
47 length := len(payload)
48 header := []byte{0x80 | opcode, 0} // FIN + opcode
49
50 if c.client {
51 header[1] = 0x80 // mask bit
52 }
53
54 switch {
55 case length <= 125:
56 header[1] |= byte(length)
57 case length <= 65535:
58 header[1] |= 126
59 ext := make([]byte, 2)
60 binary.BigEndian.PutUint16(ext, uint16(length))
61 header = append(header, ext...)
62 default:
63 header[1] |= 127
64 ext := make([]byte, 8)
65 binary.BigEndian.PutUint64(ext, uint64(length))
66 header = append(header, ext...)
67 }
68
69 if c.client {
70 var key [4]byte
71 rand.Read(key[:])
72 header = append(header, key[:]...)
73 // mask a copy so we don't modify the caller's slice
74 masked := make([]byte, len(payload))
75 copy(masked, payload)
76 mask(key, masked)
77 payload = masked
78 }
79
80 if _, err := c.rwc.Write(header); err != nil {
81 return err
82 }
83 _, err := c.rwc.Write(payload)
84 return err
85}
86
87func (c *Conn) readFrame() (fin bool, opcode byte, payload []byte, err error) {
88 var hdr [2]byte
89 if _, err = io.ReadFull(c.br, hdr[:]); err != nil {
90 return
91 }
92
93 fin = hdr[0]&0x80 != 0
94 opcode = hdr[0] & 0x0F
95 masked := hdr[1]&0x80 != 0
96 length := uint64(hdr[1] & 0x7F)
97
98 switch length {
99 case 126:
100 var ext [2]byte
101 if _, err = io.ReadFull(c.br, ext[:]); err != nil {
102 return
103 }
104 length = uint64(binary.BigEndian.Uint16(ext[:]))
105 case 127:
106 var ext [8]byte
107 if _, err = io.ReadFull(c.br, ext[:]); err != nil {
108 return
109 }
110 length = binary.BigEndian.Uint64(ext[:])
111 }
112
113 var key [4]byte
114 if masked {
115 if _, err = io.ReadFull(c.br, key[:]); err != nil {
116 return
117 }
118 }
119
120 payload = make([]byte, length)
121 if _, err = io.ReadFull(c.br, payload); err != nil {
122 return
123 }
124
125 if masked {
126 mask(key, payload)
127 }
128 return
129}
130
131// Read reads the next complete message from the connection.
132// It handles ping frames automatically by sending pong responses.
133// It respects context cancellation by setting a read deadline.
134func (c *Conn) Read(ctx context.Context) ([]byte, error) {
135 stop := context.AfterFunc(ctx, func() {
136 c.rwc.SetReadDeadline(time.Now())
137 })
138 defer stop()
139
140 var buf []byte
141 for {
142 fin, opcode, payload, err := c.readFrame()
143 if err != nil {
144 if ctx.Err() != nil {
145 return nil, ctx.Err()
146 }
147 return nil, err
148 }
149
150 switch opcode {
151 case opPing:
152 c.writeFrame(opPong, payload)
153 continue
154 case opClose:
155 return nil, fmt.Errorf("websocket: close frame received")
156 case opBinary, opContinuation:
157 buf = append(buf, payload...)
158 if fin {
159 return buf, nil
160 }
161 default:
162 // text or other opcodes — treat payload as binary
163 buf = append(buf, payload...)
164 if fin {
165 return buf, nil
166 }
167 }
168 }
169}
170
171// Write sends a binary frame to the connection.
172func (c *Conn) Write(data []byte) error {
173 return c.writeFrame(opBinary, data)
174}
175
176// Ping sends a WebSocket ping frame.
177func (c *Conn) Ping() error {
178 return c.writeFrame(opPing, nil)
179}
180
181// Close sends a close frame with the given code and reason, then closes the
182// underlying connection.
183func (c *Conn) Close(code uint16, reason string) error {
184 payload := make([]byte, 2+len(reason))
185 binary.BigEndian.PutUint16(payload, code)
186 copy(payload[2:], reason)
187 c.writeFrame(opClose, payload)
188 return c.rwc.Close()
189}
190
191// CloseConn closes the underlying network connection without sending a close frame.
192func (c *Conn) CloseConn() error {
193 return c.rwc.Close()
194}
195
196// SetReadDeadline sets the read deadline on the underlying connection.
197func (c *Conn) SetReadDeadline(t time.Time) error {
198 return c.rwc.SetReadDeadline(t)
199}
200
201var wsGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
202
203func acceptKey(key string) string {
204 h := sha1.New()
205 h.Write([]byte(key))
206 h.Write([]byte(wsGUID))
207 return base64.StdEncoding.EncodeToString(h.Sum(nil))
208}
209
210// Accept performs the server-side WebSocket handshake, hijacking the HTTP
211// connection and returning a Conn ready for framed I/O.
212func Accept(w http.ResponseWriter, r *http.Request) (*Conn, error) {
213 if !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
214 return nil, fmt.Errorf("websocket: missing Upgrade header")
215 }
216
217 key := r.Header.Get("Sec-WebSocket-Key")
218 if key == "" {
219 return nil, fmt.Errorf("websocket: missing Sec-WebSocket-Key")
220 }
221
222 hj, ok := w.(http.Hijacker)
223 if !ok {
224 return nil, fmt.Errorf("websocket: response does not support hijacking")
225 }
226
227 rwc, brw, err := hj.Hijack()
228 if err != nil {
229 return nil, err
230 }
231
232 accept := acceptKey(key)
233 respStr := "HTTP/1.1 101 Switching Protocols\r\n" +
234 "Upgrade: websocket\r\n" +
235 "Connection: Upgrade\r\n" +
236 "Sec-WebSocket-Accept: " + accept + "\r\n\r\n"
237
238 if _, err := rwc.Write([]byte(respStr)); err != nil {
239 rwc.Close()
240 return nil, err
241 }
242
243 return &Conn{rwc: rwc, br: brw.Reader, client: false}, nil
244}