diff options
| author | bndw <ben@bdw.to> | 2026-03-09 08:01:02 -0700 |
|---|---|---|
| committer | bndw <ben@bdw.to> | 2026-03-09 08:01:02 -0700 |
| commit | 61a85baf87d89fcc09f9469a113a2ddc982b0a24 (patch) | |
| tree | d8359ce5cbcbb9402ba92c617c4ebd702adf33e9 /relay | |
| parent | ce684848e25fed3aabdde4ffba6d2d8c40afa030 (diff) | |
feat: phase 2 relay implementation
Implement the Axon relay as relay/ (module axon/relay). Includes:
- WebSocket framing (RFC 6455, no external deps) in relay/websocket/
- Per-connection auth: challenge/response with ed25519 + allowlist check
- Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence
- Subscription manager with prefix-matching filter fanout in relay/subscription/
- SQLite storage with WAL/cache config and UNION query builder in relay/storage/
- Graceful shutdown on SIGINT/SIGTERM
- Filter/TagFilter types added to axon core package (required by relay)
Diffstat (limited to 'relay')
| -rw-r--r-- | relay/config.go | 66 | ||||
| -rw-r--r-- | relay/go.mod | 30 | ||||
| -rw-r--r-- | relay/go.sum | 63 | ||||
| -rw-r--r-- | relay/handler.go | 411 | ||||
| -rw-r--r-- | relay/main.go | 72 | ||||
| -rw-r--r-- | relay/server.go | 118 | ||||
| -rw-r--r-- | relay/storage/events.go | 192 | ||||
| -rw-r--r-- | relay/storage/storage.go | 86 | ||||
| -rw-r--r-- | relay/subscription/manager.go | 317 | ||||
| -rw-r--r-- | relay/websocket/websocket.go | 244 |
10 files changed, 1599 insertions, 0 deletions
diff --git a/relay/config.go b/relay/config.go new file mode 100644 index 0000000..e432b85 --- /dev/null +++ b/relay/config.go | |||
| @@ -0,0 +1,66 @@ | |||
| 1 | package main | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "encoding/hex" | ||
| 5 | "fmt" | ||
| 6 | "os" | ||
| 7 | |||
| 8 | "gopkg.in/yaml.v3" | ||
| 9 | ) | ||
| 10 | |||
| 11 | // Config holds all relay configuration loaded from config.yaml. | ||
| 12 | type Config struct { | ||
| 13 | Addr string `yaml:"addr"` | ||
| 14 | DB string `yaml:"db"` | ||
| 15 | RelayURL string `yaml:"relay_url"` | ||
| 16 | Allowlist []string `yaml:"allowlist"` // hex-encoded pubkeys | ||
| 17 | } | ||
| 18 | |||
| 19 | // DefaultConfig returns sensible defaults. | ||
| 20 | func DefaultConfig() Config { | ||
| 21 | return Config{ | ||
| 22 | Addr: ":8080", | ||
| 23 | DB: "axon.db", | ||
| 24 | RelayURL: "ws://localhost:8080", | ||
| 25 | } | ||
| 26 | } | ||
| 27 | |||
| 28 | // AllowlistBytes decodes the hex pubkeys in c.Allowlist and returns them as | ||
| 29 | // raw byte slices. Returns an error if any entry is not valid 64-char hex. | ||
| 30 | func (c *Config) AllowlistBytes() ([][]byte, error) { | ||
| 31 | out := make([][]byte, 0, len(c.Allowlist)) | ||
| 32 | for _, h := range c.Allowlist { | ||
| 33 | b, err := hex.DecodeString(h) | ||
| 34 | if err != nil { | ||
| 35 | return nil, fmt.Errorf("config: allowlist entry %q is not valid hex: %w", h, err) | ||
| 36 | } | ||
| 37 | if len(b) != 32 { | ||
| 38 | return nil, fmt.Errorf("config: allowlist entry %q decoded to %d bytes, want 32", h, len(b)) | ||
| 39 | } | ||
| 40 | out = append(out, b) | ||
| 41 | } | ||
| 42 | return out, nil | ||
| 43 | } | ||
| 44 | |||
| 45 | // LoadConfig reads and parses a YAML config file. Missing fields fall back to | ||
| 46 | // DefaultConfig values. | ||
| 47 | func LoadConfig(path string) (Config, error) { | ||
| 48 | cfg := DefaultConfig() | ||
| 49 | |||
| 50 | f, err := os.Open(path) | ||
| 51 | if err != nil { | ||
| 52 | if os.IsNotExist(err) { | ||
| 53 | // No config file — use defaults. | ||
| 54 | return cfg, nil | ||
| 55 | } | ||
| 56 | return cfg, fmt.Errorf("config: open %q: %w", path, err) | ||
| 57 | } | ||
| 58 | defer f.Close() | ||
| 59 | |||
| 60 | dec := yaml.NewDecoder(f) | ||
| 61 | dec.KnownFields(true) | ||
| 62 | if err := dec.Decode(&cfg); err != nil { | ||
| 63 | return cfg, fmt.Errorf("config: decode %q: %w", path, err) | ||
| 64 | } | ||
| 65 | return cfg, nil | ||
| 66 | } | ||
diff --git a/relay/go.mod b/relay/go.mod new file mode 100644 index 0000000..a3d424a --- /dev/null +++ b/relay/go.mod | |||
| @@ -0,0 +1,30 @@ | |||
| 1 | module axon/relay | ||
| 2 | |||
| 3 | go 1.25.5 | ||
| 4 | |||
| 5 | require ( | ||
| 6 | axon v0.0.0 | ||
| 7 | github.com/vmihailenco/msgpack/v5 v5.4.1 | ||
| 8 | gopkg.in/yaml.v3 v3.0.1 | ||
| 9 | modernc.org/sqlite v1.33.1 | ||
| 10 | ) | ||
| 11 | |||
| 12 | require ( | ||
| 13 | github.com/dustin/go-humanize v1.0.1 // indirect | ||
| 14 | github.com/google/uuid v1.6.0 // indirect | ||
| 15 | github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect | ||
| 16 | github.com/mattn/go-isatty v0.0.20 // indirect | ||
| 17 | github.com/ncruces/go-strftime v0.1.9 // indirect | ||
| 18 | github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect | ||
| 19 | github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect | ||
| 20 | golang.org/x/crypto v0.48.0 // indirect | ||
| 21 | golang.org/x/sys v0.41.0 // indirect | ||
| 22 | modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect | ||
| 23 | modernc.org/libc v1.55.3 // indirect | ||
| 24 | modernc.org/mathutil v1.6.0 // indirect | ||
| 25 | modernc.org/memory v1.8.0 // indirect | ||
| 26 | modernc.org/strutil v1.2.0 // indirect | ||
| 27 | modernc.org/token v1.1.0 // indirect | ||
| 28 | ) | ||
| 29 | |||
| 30 | replace axon => ../ | ||
diff --git a/relay/go.sum b/relay/go.sum new file mode 100644 index 0000000..04de341 --- /dev/null +++ b/relay/go.sum | |||
| @@ -0,0 +1,63 @@ | |||
| 1 | github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= | ||
| 2 | github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
| 3 | github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= | ||
| 4 | github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= | ||
| 5 | github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= | ||
| 6 | github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= | ||
| 7 | github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= | ||
| 8 | github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | ||
| 9 | github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= | ||
| 10 | github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= | ||
| 11 | github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= | ||
| 12 | github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= | ||
| 13 | github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= | ||
| 14 | github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= | ||
| 15 | github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
| 16 | github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
| 17 | github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= | ||
| 18 | github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= | ||
| 19 | github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= | ||
| 20 | github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | ||
| 21 | github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= | ||
| 22 | github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= | ||
| 23 | github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= | ||
| 24 | github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= | ||
| 25 | golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= | ||
| 26 | golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= | ||
| 27 | golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= | ||
| 28 | golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= | ||
| 29 | golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||
| 30 | golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= | ||
| 31 | golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= | ||
| 32 | golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= | ||
| 33 | golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= | ||
| 34 | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= | ||
| 35 | gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||
| 36 | gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
| 37 | gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||
| 38 | modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ= | ||
| 39 | modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= | ||
| 40 | modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y= | ||
| 41 | modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s= | ||
| 42 | modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= | ||
| 43 | modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= | ||
| 44 | modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw= | ||
| 45 | modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU= | ||
| 46 | modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= | ||
| 47 | modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= | ||
| 48 | modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= | ||
| 49 | modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= | ||
| 50 | modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= | ||
| 51 | modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= | ||
| 52 | modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= | ||
| 53 | modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= | ||
| 54 | modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= | ||
| 55 | modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= | ||
| 56 | modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= | ||
| 57 | modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= | ||
| 58 | modernc.org/sqlite v1.33.1 h1:trb6Z3YYoeM9eDL1O8do81kP+0ejv+YzgyFo+Gwy0nM= | ||
| 59 | modernc.org/sqlite v1.33.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k= | ||
| 60 | modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= | ||
| 61 | modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= | ||
| 62 | modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= | ||
| 63 | modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= | ||
diff --git a/relay/handler.go b/relay/handler.go new file mode 100644 index 0000000..afd9622 --- /dev/null +++ b/relay/handler.go | |||
| @@ -0,0 +1,411 @@ | |||
| 1 | package main | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "bytes" | ||
| 5 | "context" | ||
| 6 | "fmt" | ||
| 7 | "log" | ||
| 8 | "time" | ||
| 9 | |||
| 10 | "github.com/vmihailenco/msgpack/v5" | ||
| 11 | |||
| 12 | "axon" | ||
| 13 | "axon/relay/storage" | ||
| 14 | "axon/relay/subscription" | ||
| 15 | ws "axon/relay/websocket" | ||
| 16 | ) | ||
| 17 | |||
| 18 | // Wire protocol message type constants. | ||
| 19 | const ( | ||
| 20 | // Client → Relay | ||
| 21 | MsgTypeAuth uint16 = 1 | ||
| 22 | MsgTypeSubscribe uint16 = 2 | ||
| 23 | MsgTypeUnsubscribe uint16 = 3 | ||
| 24 | MsgTypePublish uint16 = 4 | ||
| 25 | |||
| 26 | // Relay → Client | ||
| 27 | MsgTypeChallenge uint16 = 10 | ||
| 28 | MsgTypeEvent uint16 = 11 | ||
| 29 | MsgTypeEose uint16 = 12 | ||
| 30 | MsgTypeOk uint16 = 13 | ||
| 31 | MsgTypeError uint16 = 14 | ||
| 32 | ) | ||
| 33 | |||
| 34 | // Payload types — Relay → Client | ||
| 35 | |||
| 36 | type ChallengePayload struct { | ||
| 37 | Nonce []byte `msgpack:"nonce"` | ||
| 38 | } | ||
| 39 | |||
| 40 | type EventPayload struct { | ||
| 41 | SubID string `msgpack:"sub_id"` | ||
| 42 | Event axon.Event `msgpack:"event"` | ||
| 43 | } | ||
| 44 | |||
| 45 | type EosePayload struct { | ||
| 46 | SubID string `msgpack:"sub_id"` | ||
| 47 | } | ||
| 48 | |||
| 49 | type OkPayload struct { | ||
| 50 | Message string `msgpack:"message"` | ||
| 51 | } | ||
| 52 | |||
| 53 | type ErrorPayload struct { | ||
| 54 | Code uint16 `msgpack:"code"` | ||
| 55 | Message string `msgpack:"message"` | ||
| 56 | } | ||
| 57 | |||
| 58 | // Payload types — Client → Relay | ||
| 59 | |||
| 60 | type AuthPayload struct { | ||
| 61 | PubKey []byte `msgpack:"pubkey"` | ||
| 62 | Sig []byte `msgpack:"sig"` | ||
| 63 | } | ||
| 64 | |||
| 65 | type SubscribePayload struct { | ||
| 66 | SubID string `msgpack:"sub_id"` | ||
| 67 | Filter axon.Filter `msgpack:"filter"` | ||
| 68 | } | ||
| 69 | |||
| 70 | type UnsubscribePayload struct { | ||
| 71 | SubID string `msgpack:"sub_id"` | ||
| 72 | } | ||
| 73 | |||
| 74 | type PublishPayload struct { | ||
| 75 | Event axon.Event `msgpack:"event"` | ||
| 76 | } | ||
| 77 | |||
| 78 | // conn holds per-connection state. | ||
| 79 | type conn struct { | ||
| 80 | id string // unique connection ID (hex nonce for logging) | ||
| 81 | ws *ws.Conn | ||
| 82 | store *storage.Storage | ||
| 83 | global *subscription.GlobalManager | ||
| 84 | allowlist [][]byte | ||
| 85 | relayURL string | ||
| 86 | |||
| 87 | authed bool | ||
| 88 | pubkey []byte | ||
| 89 | nonce []byte | ||
| 90 | |||
| 91 | mgr *subscription.Manager | ||
| 92 | } | ||
| 93 | |||
| 94 | // send encodes and writes a wire message to the client. | ||
| 95 | func (c *conn) send(msgType uint16, payload interface{}) error { | ||
| 96 | b, err := msgpack.Marshal([]interface{}{msgType, payload}) | ||
| 97 | if err != nil { | ||
| 98 | return fmt.Errorf("handler: marshal msg %d: %w", msgType, err) | ||
| 99 | } | ||
| 100 | return c.ws.Write(b) | ||
| 101 | } | ||
| 102 | |||
| 103 | // sendError sends an ErrorPayload. If fatal is true the connection is then closed. | ||
| 104 | func (c *conn) sendError(code uint16, message string, fatal bool) { | ||
| 105 | _ = c.send(MsgTypeError, ErrorPayload{Code: code, Message: message}) | ||
| 106 | if fatal { | ||
| 107 | c.ws.Close(1000, "") | ||
| 108 | } | ||
| 109 | } | ||
| 110 | |||
| 111 | // serve is the main per-connection loop. | ||
| 112 | func (c *conn) serve(ctx context.Context) { | ||
| 113 | defer func() { | ||
| 114 | c.mgr.CloseAll() | ||
| 115 | c.global.UnregisterConn(c.id) | ||
| 116 | }() | ||
| 117 | |||
| 118 | // Send challenge immediately on connect. | ||
| 119 | if err := c.send(MsgTypeChallenge, ChallengePayload{Nonce: c.nonce}); err != nil { | ||
| 120 | log.Printf("conn %s: send challenge: %v", c.id, err) | ||
| 121 | return | ||
| 122 | } | ||
| 123 | |||
| 124 | // Start ping goroutine. | ||
| 125 | pingStop := make(chan struct{}) | ||
| 126 | defer close(pingStop) | ||
| 127 | go c.pingLoop(pingStop) | ||
| 128 | |||
| 129 | for { | ||
| 130 | raw, err := c.ws.Read(ctx) | ||
| 131 | if err != nil { | ||
| 132 | if ctx.Err() != nil { | ||
| 133 | return | ||
| 134 | } | ||
| 135 | // Connection closed or read error. | ||
| 136 | return | ||
| 137 | } | ||
| 138 | |||
| 139 | if err := c.dispatch(ctx, raw); err != nil { | ||
| 140 | log.Printf("conn %s: dispatch: %v", c.id, err) | ||
| 141 | return | ||
| 142 | } | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | // pingLoop sends a WebSocket ping every 30 seconds. If two consecutive pings | ||
| 147 | // go unanswered (no pong received within 30s each) the connection is closed. | ||
| 148 | func (c *conn) pingLoop(stop <-chan struct{}) { | ||
| 149 | ticker := time.NewTicker(30 * time.Second) | ||
| 150 | defer ticker.Stop() | ||
| 151 | missed := 0 | ||
| 152 | for { | ||
| 153 | select { | ||
| 154 | case <-stop: | ||
| 155 | return | ||
| 156 | case <-ticker.C: | ||
| 157 | if err := c.ws.Ping(); err != nil { | ||
| 158 | return | ||
| 159 | } | ||
| 160 | missed++ | ||
| 161 | if missed >= 2 { | ||
| 162 | log.Printf("conn %s: missed 2 pings, closing", c.id) | ||
| 163 | c.ws.Close(1001, "ping timeout") | ||
| 164 | return | ||
| 165 | } | ||
| 166 | } | ||
| 167 | } | ||
| 168 | } | ||
| 169 | |||
| 170 | // dispatch parses a raw message and routes it to the appropriate handler. | ||
| 171 | func (c *conn) dispatch(ctx context.Context, raw []byte) error { | ||
| 172 | // Decode as [uint16, rawPayload] | ||
| 173 | var arr []msgpack.RawMessage | ||
| 174 | if err := msgpack.Unmarshal(raw, &arr); err != nil { | ||
| 175 | c.sendError(400, "malformed message", false) | ||
| 176 | return nil | ||
| 177 | } | ||
| 178 | if len(arr) < 2 { | ||
| 179 | c.sendError(400, "message too short", false) | ||
| 180 | return nil | ||
| 181 | } | ||
| 182 | |||
| 183 | var msgType uint16 | ||
| 184 | if err := msgpack.Unmarshal(arr[0], &msgType); err != nil { | ||
| 185 | c.sendError(400, "invalid message type", false) | ||
| 186 | return nil | ||
| 187 | } | ||
| 188 | |||
| 189 | // Only MsgTypeAuth is allowed before authentication. | ||
| 190 | if !c.authed && msgType != MsgTypeAuth { | ||
| 191 | c.sendError(401, "not authenticated", true) | ||
| 192 | return fmt.Errorf("unauthenticated message type %d", msgType) | ||
| 193 | } | ||
| 194 | |||
| 195 | switch msgType { | ||
| 196 | case MsgTypeAuth: | ||
| 197 | return c.handleAuth(arr[1]) | ||
| 198 | case MsgTypePublish: | ||
| 199 | return c.handlePublish(ctx, arr[1]) | ||
| 200 | case MsgTypeSubscribe: | ||
| 201 | return c.handleSubscribe(ctx, arr[1]) | ||
| 202 | case MsgTypeUnsubscribe: | ||
| 203 | return c.handleUnsubscribe(arr[1]) | ||
| 204 | default: | ||
| 205 | c.sendError(400, fmt.Sprintf("unknown message type %d", msgType), false) | ||
| 206 | } | ||
| 207 | return nil | ||
| 208 | } | ||
| 209 | |||
| 210 | // handleAuth processes an Auth message. | ||
| 211 | func (c *conn) handleAuth(raw msgpack.RawMessage) error { | ||
| 212 | var p AuthPayload | ||
| 213 | if err := msgpack.Unmarshal(raw, &p); err != nil { | ||
| 214 | c.sendError(400, "malformed auth payload", true) | ||
| 215 | return fmt.Errorf("unmarshal auth: %w", err) | ||
| 216 | } | ||
| 217 | |||
| 218 | if len(p.PubKey) != 32 { | ||
| 219 | c.sendError(400, "pubkey must be 32 bytes", true) | ||
| 220 | return fmt.Errorf("bad pubkey length %d", len(p.PubKey)) | ||
| 221 | } | ||
| 222 | |||
| 223 | if !axon.VerifyChallenge(p.PubKey, c.nonce, c.relayURL, p.Sig) { | ||
| 224 | c.sendError(401, "invalid signature", true) | ||
| 225 | return fmt.Errorf("auth signature invalid") | ||
| 226 | } | ||
| 227 | |||
| 228 | // Check allowlist. | ||
| 229 | if len(c.allowlist) > 0 { | ||
| 230 | allowed := false | ||
| 231 | for _, pk := range c.allowlist { | ||
| 232 | if bytes.Equal(pk, p.PubKey) { | ||
| 233 | allowed = true | ||
| 234 | break | ||
| 235 | } | ||
| 236 | } | ||
| 237 | if !allowed { | ||
| 238 | c.sendError(403, "pubkey not in allowlist", true) | ||
| 239 | return fmt.Errorf("pubkey not in allowlist") | ||
| 240 | } | ||
| 241 | } | ||
| 242 | |||
| 243 | c.authed = true | ||
| 244 | c.pubkey = p.PubKey | ||
| 245 | return c.send(MsgTypeOk, OkPayload{Message: "authenticated"}) | ||
| 246 | } | ||
| 247 | |||
| 248 | // handlePublish processes a Publish message. | ||
| 249 | func (c *conn) handlePublish(ctx context.Context, raw msgpack.RawMessage) error { | ||
| 250 | var p PublishPayload | ||
| 251 | if err := msgpack.Unmarshal(raw, &p); err != nil { | ||
| 252 | c.sendError(400, "malformed publish payload", false) | ||
| 253 | return nil | ||
| 254 | } | ||
| 255 | |||
| 256 | event := &p.Event | ||
| 257 | |||
| 258 | // Content length check. | ||
| 259 | if len(event.Content) > 65536 { | ||
| 260 | c.sendError(413, "content exceeds 64KB limit", false) | ||
| 261 | return nil | ||
| 262 | } | ||
| 263 | |||
| 264 | // Signature verification. | ||
| 265 | if err := axon.Verify(event); err != nil { | ||
| 266 | c.sendError(400, fmt.Sprintf("invalid event: %v", err), false) | ||
| 267 | return nil | ||
| 268 | } | ||
| 269 | |||
| 270 | // Job request expiry check (kinds 5000–5999). | ||
| 271 | if event.Kind >= 5000 && event.Kind < 6000 { | ||
| 272 | if expired, err := isExpired(event); err != nil { | ||
| 273 | c.sendError(400, fmt.Sprintf("bad expires_at tag: %v", err), false) | ||
| 274 | return nil | ||
| 275 | } else if expired { | ||
| 276 | c.sendError(400, "job request has expired", false) | ||
| 277 | return nil | ||
| 278 | } | ||
| 279 | } | ||
| 280 | |||
| 281 | // Marshal envelope bytes for storage and fanout. | ||
| 282 | envelopeBytes, err := axon.MarshalEvent(event) | ||
| 283 | if err != nil { | ||
| 284 | c.sendError(400, "could not marshal event", false) | ||
| 285 | return nil | ||
| 286 | } | ||
| 287 | |||
| 288 | // Ephemeral events (3000–3999): fanout only, do not store. | ||
| 289 | isEphemeral := event.Kind >= 3000 && event.Kind < 4000 | ||
| 290 | |||
| 291 | if !isEphemeral { | ||
| 292 | // Duplicate check. | ||
| 293 | exists, err := c.store.ExistsByID(ctx, event.ID) | ||
| 294 | if err != nil { | ||
| 295 | c.sendError(500, "internal error", false) | ||
| 296 | return fmt.Errorf("exists check: %w", err) | ||
| 297 | } | ||
| 298 | if exists { | ||
| 299 | c.sendError(409, "duplicate event", false) | ||
| 300 | return nil | ||
| 301 | } | ||
| 302 | |||
| 303 | // Persist. | ||
| 304 | if err := c.store.StoreEvent(ctx, event, envelopeBytes); err != nil { | ||
| 305 | if err == storage.ErrDuplicate { | ||
| 306 | c.sendError(409, "duplicate event", false) | ||
| 307 | return nil | ||
| 308 | } | ||
| 309 | c.sendError(500, "internal error", false) | ||
| 310 | return fmt.Errorf("store event: %w", err) | ||
| 311 | } | ||
| 312 | } | ||
| 313 | |||
| 314 | // Fanout to all matching subscribers. | ||
| 315 | c.global.Fanout(event, envelopeBytes) | ||
| 316 | |||
| 317 | return c.send(MsgTypeOk, OkPayload{Message: "ok"}) | ||
| 318 | } | ||
| 319 | |||
| 320 | // handleSubscribe processes a Subscribe message. | ||
| 321 | func (c *conn) handleSubscribe(ctx context.Context, raw msgpack.RawMessage) error { | ||
| 322 | var p SubscribePayload | ||
| 323 | if err := msgpack.Unmarshal(raw, &p); err != nil { | ||
| 324 | c.sendError(400, "malformed subscribe payload", false) | ||
| 325 | return nil | ||
| 326 | } | ||
| 327 | if p.SubID == "" { | ||
| 328 | c.sendError(400, "sub_id required", false) | ||
| 329 | return nil | ||
| 330 | } | ||
| 331 | |||
| 332 | // Query historical events. | ||
| 333 | envelopes, err := c.store.QueryEvents(ctx, []axon.Filter{p.Filter}) | ||
| 334 | if err != nil { | ||
| 335 | c.sendError(500, "internal error", false) | ||
| 336 | return fmt.Errorf("query events: %w", err) | ||
| 337 | } | ||
| 338 | |||
| 339 | for _, envBytes := range envelopes { | ||
| 340 | ev, err := axon.UnmarshalEvent(envBytes) | ||
| 341 | if err != nil { | ||
| 342 | log.Printf("conn %s: unmarshal stored event: %v", c.id, err) | ||
| 343 | continue | ||
| 344 | } | ||
| 345 | if err := c.send(MsgTypeEvent, EventPayload{SubID: p.SubID, Event: *ev}); err != nil { | ||
| 346 | return err | ||
| 347 | } | ||
| 348 | } | ||
| 349 | |||
| 350 | // Send EOSE. | ||
| 351 | if err := c.send(MsgTypeEose, EosePayload{SubID: p.SubID}); err != nil { | ||
| 352 | return err | ||
| 353 | } | ||
| 354 | |||
| 355 | // Register for live fanout. | ||
| 356 | sub := c.mgr.Add(p.SubID, []axon.Filter{p.Filter}) | ||
| 357 | c.global.Register(c.id, p.SubID, sub) | ||
| 358 | |||
| 359 | // Start goroutine to stream live events to this client. | ||
| 360 | go c.streamSub(sub) | ||
| 361 | |||
| 362 | return nil | ||
| 363 | } | ||
| 364 | |||
| 365 | // streamSub reads from a subscription's Events channel and sends them to the | ||
| 366 | // client. Returns when the subscription or connection is closed. | ||
| 367 | func (c *conn) streamSub(sub *subscription.Subscription) { | ||
| 368 | for envelopeBytes := range sub.Events { | ||
| 369 | ev, err := axon.UnmarshalEvent(envelopeBytes) | ||
| 370 | if err != nil { | ||
| 371 | log.Printf("conn %s: sub %s: unmarshal live event: %v", c.id, sub.ID, err) | ||
| 372 | continue | ||
| 373 | } | ||
| 374 | if err := c.send(MsgTypeEvent, EventPayload{SubID: sub.ID, Event: *ev}); err != nil { | ||
| 375 | return | ||
| 376 | } | ||
| 377 | } | ||
| 378 | } | ||
| 379 | |||
| 380 | // handleUnsubscribe processes an Unsubscribe message. | ||
| 381 | func (c *conn) handleUnsubscribe(raw msgpack.RawMessage) error { | ||
| 382 | var p UnsubscribePayload | ||
| 383 | if err := msgpack.Unmarshal(raw, &p); err != nil { | ||
| 384 | c.sendError(400, "malformed unsubscribe payload", false) | ||
| 385 | return nil | ||
| 386 | } | ||
| 387 | c.mgr.Remove(p.SubID) | ||
| 388 | c.global.Unregister(c.id, p.SubID) | ||
| 389 | return nil | ||
| 390 | } | ||
| 391 | |||
| 392 | // isExpired checks the expires_at tag on a job request event. | ||
| 393 | // Returns (true, nil) if the event is expired, (false, nil) if not, or | ||
| 394 | // (false, err) if the tag is malformed. | ||
| 395 | func isExpired(event *axon.Event) (bool, error) { | ||
| 396 | for _, tag := range event.Tags { | ||
| 397 | if tag.Name != "expires_at" { | ||
| 398 | continue | ||
| 399 | } | ||
| 400 | if len(tag.Values) == 0 { | ||
| 401 | continue | ||
| 402 | } | ||
| 403 | var ts int64 | ||
| 404 | _, err := fmt.Sscanf(tag.Values[0], "%d", &ts) | ||
| 405 | if err != nil { | ||
| 406 | return false, fmt.Errorf("parse expires_at: %w", err) | ||
| 407 | } | ||
| 408 | return time.Now().Unix() > ts, nil | ||
| 409 | } | ||
| 410 | return false, nil | ||
| 411 | } | ||
diff --git a/relay/main.go b/relay/main.go new file mode 100644 index 0000000..2cfa034 --- /dev/null +++ b/relay/main.go | |||
| @@ -0,0 +1,72 @@ | |||
| 1 | package main | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "errors" | ||
| 6 | "flag" | ||
| 7 | "log" | ||
| 8 | "net/http" | ||
| 9 | "os" | ||
| 10 | "os/signal" | ||
| 11 | "syscall" | ||
| 12 | "time" | ||
| 13 | |||
| 14 | "axon/relay/storage" | ||
| 15 | "axon/relay/subscription" | ||
| 16 | ) | ||
| 17 | |||
| 18 | func main() { | ||
| 19 | cfgPath := flag.String("config", "config.yaml", "path to config.yaml") | ||
| 20 | flag.Parse() | ||
| 21 | |||
| 22 | cfg, err := LoadConfig(*cfgPath) | ||
| 23 | if err != nil { | ||
| 24 | log.Fatalf("relay: load config: %v", err) | ||
| 25 | } | ||
| 26 | |||
| 27 | allowlist, err := cfg.AllowlistBytes() | ||
| 28 | if err != nil { | ||
| 29 | log.Fatalf("relay: allowlist: %v", err) | ||
| 30 | } | ||
| 31 | |||
| 32 | store, err := storage.New(cfg.DB) | ||
| 33 | if err != nil { | ||
| 34 | log.Fatalf("relay: open storage: %v", err) | ||
| 35 | } | ||
| 36 | defer store.Close() | ||
| 37 | |||
| 38 | global := subscription.NewGlobalManager() | ||
| 39 | |||
| 40 | // Periodically purge closed subscriptions. | ||
| 41 | stopPurger := make(chan struct{}) | ||
| 42 | global.StartPurger(5*time.Minute, stopPurger) | ||
| 43 | defer close(stopPurger) | ||
| 44 | |||
| 45 | srv := NewServer(cfg, allowlist, store, global) | ||
| 46 | |||
| 47 | // Graceful shutdown on SIGINT / SIGTERM. | ||
| 48 | sigCh := make(chan os.Signal, 1) | ||
| 49 | signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) | ||
| 50 | |||
| 51 | errCh := make(chan error, 1) | ||
| 52 | go func() { | ||
| 53 | if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||
| 54 | errCh <- err | ||
| 55 | } | ||
| 56 | }() | ||
| 57 | |||
| 58 | select { | ||
| 59 | case sig := <-sigCh: | ||
| 60 | log.Printf("relay: received signal %s, shutting down", sig) | ||
| 61 | case err := <-errCh: | ||
| 62 | log.Fatalf("relay: server error: %v", err) | ||
| 63 | } | ||
| 64 | |||
| 65 | shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| 66 | defer cancel() | ||
| 67 | |||
| 68 | if err := srv.Shutdown(shutdownCtx); err != nil { | ||
| 69 | log.Printf("relay: shutdown error: %v", err) | ||
| 70 | } | ||
| 71 | log.Println("relay: stopped") | ||
| 72 | } | ||
diff --git a/relay/server.go b/relay/server.go new file mode 100644 index 0000000..085929c --- /dev/null +++ b/relay/server.go | |||
| @@ -0,0 +1,118 @@ | |||
| 1 | package main | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "crypto/rand" | ||
| 6 | "encoding/hex" | ||
| 7 | "fmt" | ||
| 8 | "log" | ||
| 9 | "net/http" | ||
| 10 | "sync" | ||
| 11 | |||
| 12 | "axon/relay/storage" | ||
| 13 | "axon/relay/subscription" | ||
| 14 | ws "axon/relay/websocket" | ||
| 15 | ) | ||
| 16 | |||
| 17 | // Server is the HTTP + WebSocket server for the Axon relay. | ||
| 18 | type Server struct { | ||
| 19 | cfg Config | ||
| 20 | allowlist [][]byte | ||
| 21 | store *storage.Storage | ||
| 22 | global *subscription.GlobalManager | ||
| 23 | |||
| 24 | mu sync.WaitGroup | ||
| 25 | httpSrv *http.Server | ||
| 26 | } | ||
| 27 | |||
| 28 | // NewServer creates a Server from the given config. | ||
| 29 | func NewServer(cfg Config, allowlist [][]byte, store *storage.Storage, global *subscription.GlobalManager) *Server { | ||
| 30 | return &Server{ | ||
| 31 | cfg: cfg, | ||
| 32 | allowlist: allowlist, | ||
| 33 | store: store, | ||
| 34 | global: global, | ||
| 35 | } | ||
| 36 | } | ||
| 37 | |||
| 38 | // Start configures the HTTP server and starts listening. Call Shutdown to stop. | ||
| 39 | func (s *Server) Start() error { | ||
| 40 | mux := http.NewServeMux() | ||
| 41 | mux.HandleFunc("/", s.handleWS) | ||
| 42 | |||
| 43 | s.httpSrv = &http.Server{ | ||
| 44 | Addr: s.cfg.Addr, | ||
| 45 | Handler: mux, | ||
| 46 | } | ||
| 47 | |||
| 48 | log.Printf("relay: listening on %s", s.cfg.Addr) | ||
| 49 | return s.httpSrv.ListenAndServe() | ||
| 50 | } | ||
| 51 | |||
| 52 | // Shutdown gracefully stops the server and waits for all connections to drain. | ||
| 53 | func (s *Server) Shutdown(ctx context.Context) error { | ||
| 54 | err := s.httpSrv.Shutdown(ctx) | ||
| 55 | // Wait for all handler goroutines to finish. | ||
| 56 | done := make(chan struct{}) | ||
| 57 | go func() { | ||
| 58 | s.mu.Wait() | ||
| 59 | close(done) | ||
| 60 | }() | ||
| 61 | select { | ||
| 62 | case <-done: | ||
| 63 | case <-ctx.Done(): | ||
| 64 | } | ||
| 65 | return err | ||
| 66 | } | ||
| 67 | |||
| 68 | // handleWS upgrades an HTTP request to a WebSocket connection and starts the | ||
| 69 | // per-connection handler goroutine. | ||
| 70 | func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) { | ||
| 71 | c, err := ws.Accept(w, r) | ||
| 72 | if err != nil { | ||
| 73 | http.Error(w, "WebSocket upgrade failed", http.StatusBadRequest) | ||
| 74 | return | ||
| 75 | } | ||
| 76 | |||
| 77 | // Generate 32-byte nonce for the auth challenge. | ||
| 78 | nonce := make([]byte, 32) | ||
| 79 | if _, err := rand.Read(nonce); err != nil { | ||
| 80 | log.Printf("relay: generate nonce: %v", err) | ||
| 81 | c.CloseConn() | ||
| 82 | return | ||
| 83 | } | ||
| 84 | |||
| 85 | connID := hex.EncodeToString(nonce[:8]) | ||
| 86 | |||
| 87 | h := &conn{ | ||
| 88 | id: connID, | ||
| 89 | ws: c, | ||
| 90 | store: s.store, | ||
| 91 | global: s.global, | ||
| 92 | allowlist: s.allowlist, | ||
| 93 | relayURL: s.cfg.RelayURL, | ||
| 94 | nonce: nonce, | ||
| 95 | mgr: subscription.NewManager(), | ||
| 96 | } | ||
| 97 | |||
| 98 | s.mu.Add(1) | ||
| 99 | go func() { | ||
| 100 | defer s.mu.Done() | ||
| 101 | ctx := r.Context() | ||
| 102 | h.serve(ctx) | ||
| 103 | if err := c.CloseConn(); err != nil { | ||
| 104 | // Ignore close errors — connection may already be gone. | ||
| 105 | _ = err | ||
| 106 | } | ||
| 107 | log.Printf("conn %s: closed", connID) | ||
| 108 | }() | ||
| 109 | } | ||
| 110 | |||
| 111 | // generateConnID creates a unique connection identifier for logging. | ||
| 112 | func generateConnID() (string, error) { | ||
| 113 | var b [8]byte | ||
| 114 | if _, err := rand.Read(b[:]); err != nil { | ||
| 115 | return "", fmt.Errorf("server: generate conn id: %w", err) | ||
| 116 | } | ||
| 117 | return hex.EncodeToString(b[:]), nil | ||
| 118 | } | ||
diff --git a/relay/storage/events.go b/relay/storage/events.go new file mode 100644 index 0000000..cf10097 --- /dev/null +++ b/relay/storage/events.go | |||
| @@ -0,0 +1,192 @@ | |||
| 1 | package storage | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "database/sql" | ||
| 6 | "errors" | ||
| 7 | "fmt" | ||
| 8 | "strings" | ||
| 9 | |||
| 10 | "axon" | ||
| 11 | ) | ||
| 12 | |||
| 13 | // ErrDuplicate is returned by StoreEvent when the event ID already exists. | ||
| 14 | var ErrDuplicate = errors.New("storage: duplicate event") | ||
| 15 | |||
| 16 | // StoreEvent persists an event and its tags to the database in a single | ||
| 17 | // transaction. envelopeBytes is the verbatim msgpack representation used for | ||
| 18 | // zero-copy fanout. | ||
| 19 | func (s *Storage) StoreEvent(ctx context.Context, event *axon.Event, envelopeBytes []byte) error { | ||
| 20 | tx, err := s.db.BeginTx(ctx, nil) | ||
| 21 | if err != nil { | ||
| 22 | return fmt.Errorf("storage: begin tx: %w", err) | ||
| 23 | } | ||
| 24 | defer tx.Rollback() | ||
| 25 | |||
| 26 | _, err = tx.ExecContext(ctx, | ||
| 27 | `INSERT INTO events (id, pubkey, created_at, kind, envelope_bytes) VALUES (?, ?, ?, ?, ?)`, | ||
| 28 | event.ID, event.PubKey, event.CreatedAt, event.Kind, envelopeBytes, | ||
| 29 | ) | ||
| 30 | if err != nil { | ||
| 31 | if isDuplicateError(err) { | ||
| 32 | return ErrDuplicate | ||
| 33 | } | ||
| 34 | return fmt.Errorf("storage: insert event: %w", err) | ||
| 35 | } | ||
| 36 | |||
| 37 | for _, tag := range event.Tags { | ||
| 38 | if len(tag.Values) == 0 { | ||
| 39 | continue | ||
| 40 | } | ||
| 41 | _, err = tx.ExecContext(ctx, | ||
| 42 | `INSERT INTO tags (event_id, name, value) VALUES (?, ?, ?)`, | ||
| 43 | event.ID, tag.Name, tag.Values[0], | ||
| 44 | ) | ||
| 45 | if err != nil { | ||
| 46 | return fmt.Errorf("storage: insert tag: %w", err) | ||
| 47 | } | ||
| 48 | } | ||
| 49 | |||
| 50 | return tx.Commit() | ||
| 51 | } | ||
| 52 | |||
| 53 | // ExistsByID returns true if an event with the given ID is already stored. | ||
| 54 | func (s *Storage) ExistsByID(ctx context.Context, id []byte) (bool, error) { | ||
| 55 | var n int | ||
| 56 | err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM events WHERE id = ?`, id).Scan(&n) | ||
| 57 | if err != nil && err != sql.ErrNoRows { | ||
| 58 | return false, fmt.Errorf("storage: exists: %w", err) | ||
| 59 | } | ||
| 60 | return n > 0, nil | ||
| 61 | } | ||
| 62 | |||
| 63 | // QueryEvents executes the given filters against the database using a UNION | ||
| 64 | // query and returns matching event envelope bytes in descending created_at | ||
| 65 | // order. The effective LIMIT is the minimum non-zero Limit across all filters. | ||
| 66 | func (s *Storage) QueryEvents(ctx context.Context, filters []axon.Filter) ([][]byte, error) { | ||
| 67 | if len(filters) == 0 { | ||
| 68 | return nil, nil | ||
| 69 | } | ||
| 70 | |||
| 71 | var unions []string | ||
| 72 | var args []interface{} | ||
| 73 | var effectiveLimit int32 | ||
| 74 | |||
| 75 | for _, f := range filters { | ||
| 76 | var filterArgs []interface{} | ||
| 77 | clause := buildWhereClause(f, &filterArgs) | ||
| 78 | sub := fmt.Sprintf( | ||
| 79 | "SELECT e.envelope_bytes, e.created_at FROM events e WHERE %s", clause) | ||
| 80 | unions = append(unions, sub) | ||
| 81 | args = append(args, filterArgs...) | ||
| 82 | if f.Limit > 0 && (effectiveLimit == 0 || f.Limit < effectiveLimit) { | ||
| 83 | effectiveLimit = f.Limit | ||
| 84 | } | ||
| 85 | } | ||
| 86 | |||
| 87 | query := strings.Join(unions, " UNION ") + " ORDER BY created_at DESC" | ||
| 88 | if effectiveLimit > 0 { | ||
| 89 | query += fmt.Sprintf(" LIMIT %d", effectiveLimit) | ||
| 90 | } | ||
| 91 | |||
| 92 | rows, err := s.db.QueryContext(ctx, query, args...) | ||
| 93 | if err != nil { | ||
| 94 | return nil, fmt.Errorf("storage: query: %w", err) | ||
| 95 | } | ||
| 96 | defer rows.Close() | ||
| 97 | |||
| 98 | var results [][]byte | ||
| 99 | for rows.Next() { | ||
| 100 | var envelope []byte | ||
| 101 | var createdAt int64 | ||
| 102 | if err := rows.Scan(&envelope, &createdAt); err != nil { | ||
| 103 | return nil, fmt.Errorf("storage: scan: %w", err) | ||
| 104 | } | ||
| 105 | results = append(results, envelope) | ||
| 106 | } | ||
| 107 | return results, rows.Err() | ||
| 108 | } | ||
| 109 | |||
| 110 | // buildWhereClause builds the SQL WHERE clause for a single filter, appending | ||
| 111 | // bind parameters to args. | ||
| 112 | func buildWhereClause(f axon.Filter, args *[]interface{}) string { | ||
| 113 | var conditions []string | ||
| 114 | |||
| 115 | if len(f.IDs) > 0 { | ||
| 116 | conditions = append(conditions, buildBlobPrefixCondition("e.id", f.IDs, args)) | ||
| 117 | } | ||
| 118 | |||
| 119 | if len(f.Authors) > 0 { | ||
| 120 | conditions = append(conditions, buildBlobPrefixCondition("e.pubkey", f.Authors, args)) | ||
| 121 | } | ||
| 122 | |||
| 123 | if len(f.Kinds) > 0 { | ||
| 124 | placeholders := make([]string, len(f.Kinds)) | ||
| 125 | for i, k := range f.Kinds { | ||
| 126 | placeholders[i] = "?" | ||
| 127 | *args = append(*args, k) | ||
| 128 | } | ||
| 129 | conditions = append(conditions, "e.kind IN ("+strings.Join(placeholders, ",")+")") | ||
| 130 | } | ||
| 131 | |||
| 132 | if f.Since != 0 { | ||
| 133 | conditions = append(conditions, "e.created_at >= ?") | ||
| 134 | *args = append(*args, f.Since) | ||
| 135 | } | ||
| 136 | |||
| 137 | if f.Until != 0 { | ||
| 138 | conditions = append(conditions, "e.created_at <= ?") | ||
| 139 | *args = append(*args, f.Until) | ||
| 140 | } | ||
| 141 | |||
| 142 | for _, tf := range f.Tags { | ||
| 143 | conditions = append(conditions, buildTagJoinCondition(tf, args)) | ||
| 144 | } | ||
| 145 | |||
| 146 | if len(conditions) == 0 { | ||
| 147 | return "1=1" | ||
| 148 | } | ||
| 149 | return strings.Join(conditions, " AND ") | ||
| 150 | } | ||
| 151 | |||
| 152 | // buildBlobPrefixCondition builds an OR condition for prefix-matching a BLOB | ||
| 153 | // column. Prefix slices of exactly 32 bytes use equality; shorter slices use | ||
| 154 | // hex(column) LIKE 'HEX%'. | ||
| 155 | func buildBlobPrefixCondition(column string, prefixes [][]byte, args *[]interface{}) string { | ||
| 156 | var orConds []string | ||
| 157 | for _, prefix := range prefixes { | ||
| 158 | if len(prefix) == 32 { | ||
| 159 | orConds = append(orConds, column+" = ?") | ||
| 160 | *args = append(*args, prefix) | ||
| 161 | } else { | ||
| 162 | hexPrefix := fmt.Sprintf("%X", prefix) | ||
| 163 | orConds = append(orConds, fmt.Sprintf("hex(%s) LIKE ?", column)) | ||
| 164 | *args = append(*args, hexPrefix+"%") | ||
| 165 | } | ||
| 166 | } | ||
| 167 | if len(orConds) == 1 { | ||
| 168 | return orConds[0] | ||
| 169 | } | ||
| 170 | return "(" + strings.Join(orConds, " OR ") + ")" | ||
| 171 | } | ||
| 172 | |||
| 173 | // buildTagJoinCondition builds an EXISTS sub-select for a TagFilter. | ||
| 174 | func buildTagJoinCondition(tf axon.TagFilter, args *[]interface{}) string { | ||
| 175 | if len(tf.Values) == 0 { | ||
| 176 | *args = append(*args, tf.Name) | ||
| 177 | return "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ?)" | ||
| 178 | } | ||
| 179 | var orConds []string | ||
| 180 | for _, v := range tf.Values { | ||
| 181 | orConds = append(orConds, "EXISTS (SELECT 1 FROM tags t WHERE t.event_id = e.id AND t.name = ? AND t.value = ?)") | ||
| 182 | *args = append(*args, tf.Name, v) | ||
| 183 | } | ||
| 184 | if len(orConds) == 1 { | ||
| 185 | return orConds[0] | ||
| 186 | } | ||
| 187 | return "(" + strings.Join(orConds, " OR ") + ")" | ||
| 188 | } | ||
| 189 | |||
| 190 | func isDuplicateError(err error) bool { | ||
| 191 | return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") | ||
| 192 | } | ||
diff --git a/relay/storage/storage.go b/relay/storage/storage.go new file mode 100644 index 0000000..95b278d --- /dev/null +++ b/relay/storage/storage.go | |||
| @@ -0,0 +1,86 @@ | |||
| 1 | // Package storage provides SQLite-backed event persistence for the Axon relay. | ||
| 2 | package storage | ||
| 3 | |||
| 4 | import ( | ||
| 5 | "context" | ||
| 6 | "database/sql" | ||
| 7 | "fmt" | ||
| 8 | |||
| 9 | _ "modernc.org/sqlite" | ||
| 10 | ) | ||
| 11 | |||
| 12 | // Storage wraps a SQLite database for Axon event persistence. | ||
| 13 | type Storage struct { | ||
| 14 | db *sql.DB | ||
| 15 | } | ||
| 16 | |||
| 17 | // New opens (or creates) the SQLite database at dbPath, applies WAL pragmas, | ||
| 18 | // and initialises the schema. Call Close when done. | ||
| 19 | func New(dbPath string) (*Storage, error) { | ||
| 20 | db, err := sql.Open("sqlite", dbPath) | ||
| 21 | if err != nil { | ||
| 22 | return nil, fmt.Errorf("storage: open db: %w", err) | ||
| 23 | } | ||
| 24 | |||
| 25 | // SQLite works best with a single writer. | ||
| 26 | db.SetMaxOpenConns(1) | ||
| 27 | db.SetMaxIdleConns(1) | ||
| 28 | db.SetConnMaxLifetime(0) | ||
| 29 | |||
| 30 | pragmas := []string{ | ||
| 31 | "PRAGMA journal_mode=WAL", | ||
| 32 | "PRAGMA synchronous=NORMAL", | ||
| 33 | "PRAGMA cache_size=-40960", // ~40 MB (negative = kibibytes) | ||
| 34 | "PRAGMA temp_store=MEMORY", | ||
| 35 | "PRAGMA mmap_size=268435456", // 256 MB | ||
| 36 | "PRAGMA page_size=4096", | ||
| 37 | "PRAGMA foreign_keys=ON", | ||
| 38 | "PRAGMA busy_timeout=5000", | ||
| 39 | } | ||
| 40 | |||
| 41 | for _, p := range pragmas { | ||
| 42 | if _, err := db.Exec(p); err != nil { | ||
| 43 | db.Close() | ||
| 44 | return nil, fmt.Errorf("storage: set pragma %q: %w", p, err) | ||
| 45 | } | ||
| 46 | } | ||
| 47 | |||
| 48 | s := &Storage{db: db} | ||
| 49 | if err := s.initSchema(context.Background()); err != nil { | ||
| 50 | db.Close() | ||
| 51 | return nil, fmt.Errorf("storage: init schema: %w", err) | ||
| 52 | } | ||
| 53 | return s, nil | ||
| 54 | } | ||
| 55 | |||
| 56 | // Close closes the underlying database connection. | ||
| 57 | func (s *Storage) Close() error { | ||
| 58 | return s.db.Close() | ||
| 59 | } | ||
| 60 | |||
| 61 | const schema = ` | ||
| 62 | CREATE TABLE IF NOT EXISTS events ( | ||
| 63 | id BLOB PRIMARY KEY, | ||
| 64 | pubkey BLOB NOT NULL, | ||
| 65 | created_at INTEGER NOT NULL, | ||
| 66 | kind INTEGER NOT NULL, | ||
| 67 | envelope_bytes BLOB NOT NULL | ||
| 68 | ) STRICT; | ||
| 69 | |||
| 70 | CREATE TABLE IF NOT EXISTS tags ( | ||
| 71 | event_id BLOB NOT NULL REFERENCES events(id), | ||
| 72 | name TEXT NOT NULL, | ||
| 73 | value TEXT NOT NULL | ||
| 74 | ) STRICT; | ||
| 75 | |||
| 76 | CREATE INDEX IF NOT EXISTS idx_events_pubkey ON events(pubkey, created_at DESC); | ||
| 77 | CREATE INDEX IF NOT EXISTS idx_events_kind ON events(kind, created_at DESC); | ||
| 78 | CREATE INDEX IF NOT EXISTS idx_events_created_at ON events(created_at DESC); | ||
| 79 | CREATE INDEX IF NOT EXISTS idx_tags_name_value ON tags(name, value); | ||
| 80 | CREATE INDEX IF NOT EXISTS idx_tags_event_id ON tags(event_id); | ||
| 81 | ` | ||
| 82 | |||
| 83 | func (s *Storage) initSchema(ctx context.Context) error { | ||
| 84 | _, err := s.db.ExecContext(ctx, schema) | ||
| 85 | return err | ||
| 86 | } | ||
diff --git a/relay/subscription/manager.go b/relay/subscription/manager.go new file mode 100644 index 0000000..d8ba653 --- /dev/null +++ b/relay/subscription/manager.go | |||
| @@ -0,0 +1,317 @@ | |||
| 1 | // Package subscription manages in-memory subscriptions and live event fanout. | ||
| 2 | package subscription | ||
| 3 | |||
| 4 | import ( | ||
| 5 | "bytes" | ||
| 6 | "sync" | ||
| 7 | "time" | ||
| 8 | |||
| 9 | "axon" | ||
| 10 | ) | ||
| 11 | |||
| 12 | // Subscription holds a single client subscription: an ID, a set of filters, | ||
| 13 | // and a buffered channel for live event delivery. | ||
| 14 | type Subscription struct { | ||
| 15 | ID string | ||
| 16 | Filters []axon.Filter | ||
| 17 | // Events carries raw msgpack envelope bytes for matched events. | ||
| 18 | Events chan []byte | ||
| 19 | done chan struct{} | ||
| 20 | once sync.Once | ||
| 21 | } | ||
| 22 | |||
| 23 | // newSubscription creates a subscription with a buffered event channel. | ||
| 24 | func newSubscription(id string, filters []axon.Filter) *Subscription { | ||
| 25 | return &Subscription{ | ||
| 26 | ID: id, | ||
| 27 | Filters: filters, | ||
| 28 | Events: make(chan []byte, 100), | ||
| 29 | done: make(chan struct{}), | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | // Close shuts down the subscription and drains the event channel. | ||
| 34 | func (s *Subscription) Close() { | ||
| 35 | s.once.Do(func() { | ||
| 36 | close(s.done) | ||
| 37 | close(s.Events) | ||
| 38 | }) | ||
| 39 | } | ||
| 40 | |||
| 41 | // IsClosed reports whether the subscription has been closed. | ||
| 42 | func (s *Subscription) IsClosed() bool { | ||
| 43 | select { | ||
| 44 | case <-s.done: | ||
| 45 | return true | ||
| 46 | default: | ||
| 47 | return false | ||
| 48 | } | ||
| 49 | } | ||
| 50 | |||
| 51 | // Done returns a channel that is closed when the subscription is closed. | ||
| 52 | func (s *Subscription) Done() <-chan struct{} { | ||
| 53 | return s.done | ||
| 54 | } | ||
| 55 | |||
| 56 | // Manager maintains all active subscriptions for a single connection and fans | ||
| 57 | // out incoming events to matching subscribers. | ||
| 58 | type Manager struct { | ||
| 59 | mu sync.RWMutex | ||
| 60 | subs map[string]*Subscription | ||
| 61 | } | ||
| 62 | |||
| 63 | // NewManager returns an empty Manager. | ||
| 64 | func NewManager() *Manager { | ||
| 65 | return &Manager{subs: make(map[string]*Subscription)} | ||
| 66 | } | ||
| 67 | |||
| 68 | // Add registers a new subscription, replacing any existing subscription with | ||
| 69 | // the same ID. | ||
| 70 | func (m *Manager) Add(id string, filters []axon.Filter) *Subscription { | ||
| 71 | m.mu.Lock() | ||
| 72 | defer m.mu.Unlock() | ||
| 73 | |||
| 74 | // Remove old subscription with same ID if present. | ||
| 75 | if old, ok := m.subs[id]; ok { | ||
| 76 | old.Close() | ||
| 77 | } | ||
| 78 | |||
| 79 | sub := newSubscription(id, filters) | ||
| 80 | m.subs[id] = sub | ||
| 81 | return sub | ||
| 82 | } | ||
| 83 | |||
| 84 | // Remove cancels and deletes a subscription by ID. | ||
| 85 | func (m *Manager) Remove(id string) { | ||
| 86 | m.mu.Lock() | ||
| 87 | defer m.mu.Unlock() | ||
| 88 | |||
| 89 | if sub, ok := m.subs[id]; ok { | ||
| 90 | sub.Close() | ||
| 91 | delete(m.subs, id) | ||
| 92 | } | ||
| 93 | } | ||
| 94 | |||
| 95 | // CloseAll cancels every subscription held by this manager. | ||
| 96 | func (m *Manager) CloseAll() { | ||
| 97 | m.mu.Lock() | ||
| 98 | defer m.mu.Unlock() | ||
| 99 | |||
| 100 | for id, sub := range m.subs { | ||
| 101 | sub.Close() | ||
| 102 | delete(m.subs, id) | ||
| 103 | } | ||
| 104 | } | ||
| 105 | |||
| 106 | // Fanout delivers envelopeBytes to all subscriptions whose filters match event. | ||
| 107 | // The send is non-blocking: if the channel is full the event is dropped for | ||
| 108 | // that subscriber. | ||
| 109 | func (m *Manager) Fanout(event *axon.Event, envelopeBytes []byte) { | ||
| 110 | m.mu.RLock() | ||
| 111 | defer m.mu.RUnlock() | ||
| 112 | |||
| 113 | for _, sub := range m.subs { | ||
| 114 | if sub.IsClosed() { | ||
| 115 | continue | ||
| 116 | } | ||
| 117 | if MatchesAnyFilter(event, sub.Filters) { | ||
| 118 | select { | ||
| 119 | case sub.Events <- envelopeBytes: | ||
| 120 | case <-sub.done: | ||
| 121 | default: | ||
| 122 | // channel full — drop | ||
| 123 | } | ||
| 124 | } | ||
| 125 | } | ||
| 126 | } | ||
| 127 | |||
| 128 | // MatchesAnyFilter returns true if event matches at least one filter in filters. | ||
| 129 | func MatchesAnyFilter(event *axon.Event, filters []axon.Filter) bool { | ||
| 130 | for i := range filters { | ||
| 131 | if MatchesFilter(event, &filters[i]) { | ||
| 132 | return true | ||
| 133 | } | ||
| 134 | } | ||
| 135 | return false | ||
| 136 | } | ||
| 137 | |||
| 138 | // MatchesFilter returns true if event satisfies all constraints in f. | ||
| 139 | func MatchesFilter(event *axon.Event, f *axon.Filter) bool { | ||
| 140 | if len(f.IDs) > 0 { | ||
| 141 | if !matchesBytesPrefixes(event.ID, f.IDs) { | ||
| 142 | return false | ||
| 143 | } | ||
| 144 | } | ||
| 145 | |||
| 146 | if len(f.Authors) > 0 { | ||
| 147 | if !matchesBytesPrefixes(event.PubKey, f.Authors) { | ||
| 148 | return false | ||
| 149 | } | ||
| 150 | } | ||
| 151 | |||
| 152 | if len(f.Kinds) > 0 { | ||
| 153 | found := false | ||
| 154 | for _, k := range f.Kinds { | ||
| 155 | if event.Kind == k { | ||
| 156 | found = true | ||
| 157 | break | ||
| 158 | } | ||
| 159 | } | ||
| 160 | if !found { | ||
| 161 | return false | ||
| 162 | } | ||
| 163 | } | ||
| 164 | |||
| 165 | if f.Since != 0 && event.CreatedAt < f.Since { | ||
| 166 | return false | ||
| 167 | } | ||
| 168 | |||
| 169 | if f.Until != 0 && event.CreatedAt > f.Until { | ||
| 170 | return false | ||
| 171 | } | ||
| 172 | |||
| 173 | for _, tf := range f.Tags { | ||
| 174 | if !eventHasTagMatch(event, tf.Name, tf.Values) { | ||
| 175 | return false | ||
| 176 | } | ||
| 177 | } | ||
| 178 | |||
| 179 | return true | ||
| 180 | } | ||
| 181 | |||
| 182 | // matchesBytesPrefixes returns true if value has any of the given byte slices | ||
| 183 | // as a prefix. A prefix of exactly 32 bytes must match exactly. | ||
| 184 | func matchesBytesPrefixes(value []byte, prefixes [][]byte) bool { | ||
| 185 | for _, prefix := range prefixes { | ||
| 186 | if len(prefix) == 0 { | ||
| 187 | return true | ||
| 188 | } | ||
| 189 | if len(prefix) >= len(value) { | ||
| 190 | if bytes.Equal(value, prefix) { | ||
| 191 | return true | ||
| 192 | } | ||
| 193 | } else { | ||
| 194 | if bytes.HasPrefix(value, prefix) { | ||
| 195 | return true | ||
| 196 | } | ||
| 197 | } | ||
| 198 | } | ||
| 199 | return false | ||
| 200 | } | ||
| 201 | |||
| 202 | // eventHasTagMatch returns true if event has a tag named name whose first | ||
| 203 | // value matches any of values. | ||
| 204 | func eventHasTagMatch(event *axon.Event, name string, values []string) bool { | ||
| 205 | for _, tag := range event.Tags { | ||
| 206 | if tag.Name != name { | ||
| 207 | continue | ||
| 208 | } | ||
| 209 | if len(values) == 0 { | ||
| 210 | return true | ||
| 211 | } | ||
| 212 | if len(tag.Values) == 0 { | ||
| 213 | continue | ||
| 214 | } | ||
| 215 | for _, v := range values { | ||
| 216 | if tag.Values[0] == v { | ||
| 217 | return true | ||
| 218 | } | ||
| 219 | } | ||
| 220 | } | ||
| 221 | return false | ||
| 222 | } | ||
| 223 | |||
| 224 | // GlobalManager is a relay-wide manager that holds subscriptions from all | ||
| 225 | // connections and supports cross-connection fanout. | ||
| 226 | type GlobalManager struct { | ||
| 227 | mu sync.RWMutex | ||
| 228 | subs map[string]*Subscription // key: "connID:subID" | ||
| 229 | } | ||
| 230 | |||
| 231 | // NewGlobalManager returns an empty GlobalManager. | ||
| 232 | func NewGlobalManager() *GlobalManager { | ||
| 233 | return &GlobalManager{subs: make(map[string]*Subscription)} | ||
| 234 | } | ||
| 235 | |||
| 236 | // Register adds a subscription under a globally unique key. | ||
| 237 | func (g *GlobalManager) Register(connID, subID string, sub *Subscription) { | ||
| 238 | key := connID + ":" + subID | ||
| 239 | g.mu.Lock() | ||
| 240 | defer g.mu.Unlock() | ||
| 241 | if old, ok := g.subs[key]; ok { | ||
| 242 | old.Close() | ||
| 243 | } | ||
| 244 | g.subs[key] = sub | ||
| 245 | } | ||
| 246 | |||
| 247 | // Unregister removes a subscription. | ||
| 248 | func (g *GlobalManager) Unregister(connID, subID string) { | ||
| 249 | key := connID + ":" + subID | ||
| 250 | g.mu.Lock() | ||
| 251 | defer g.mu.Unlock() | ||
| 252 | if sub, ok := g.subs[key]; ok { | ||
| 253 | sub.Close() | ||
| 254 | delete(g.subs, key) | ||
| 255 | } | ||
| 256 | } | ||
| 257 | |||
| 258 | // UnregisterConn removes all subscriptions for a connection. | ||
| 259 | func (g *GlobalManager) UnregisterConn(connID string) { | ||
| 260 | prefix := connID + ":" | ||
| 261 | g.mu.Lock() | ||
| 262 | defer g.mu.Unlock() | ||
| 263 | for key, sub := range g.subs { | ||
| 264 | if len(key) > len(prefix) && key[:len(prefix)] == prefix { | ||
| 265 | sub.Close() | ||
| 266 | delete(g.subs, key) | ||
| 267 | } | ||
| 268 | } | ||
| 269 | } | ||
| 270 | |||
| 271 | // Fanout delivers the event to all matching subscriptions across all connections. | ||
| 272 | func (g *GlobalManager) Fanout(event *axon.Event, envelopeBytes []byte) { | ||
| 273 | g.mu.RLock() | ||
| 274 | defer g.mu.RUnlock() | ||
| 275 | |||
| 276 | for _, sub := range g.subs { | ||
| 277 | if sub.IsClosed() { | ||
| 278 | continue | ||
| 279 | } | ||
| 280 | if MatchesAnyFilter(event, sub.Filters) { | ||
| 281 | select { | ||
| 282 | case sub.Events <- envelopeBytes: | ||
| 283 | case <-sub.done: | ||
| 284 | default: | ||
| 285 | } | ||
| 286 | } | ||
| 287 | } | ||
| 288 | } | ||
| 289 | |||
| 290 | // PurgeExpired removes closed subscriptions from the global map. | ||
| 291 | // Call periodically to prevent unbounded growth. | ||
| 292 | func (g *GlobalManager) PurgeExpired() { | ||
| 293 | g.mu.Lock() | ||
| 294 | defer g.mu.Unlock() | ||
| 295 | for key, sub := range g.subs { | ||
| 296 | if sub.IsClosed() { | ||
| 297 | delete(g.subs, key) | ||
| 298 | } | ||
| 299 | } | ||
| 300 | } | ||
| 301 | |||
| 302 | // StartPurger launches a background goroutine that periodically removes closed | ||
| 303 | // subscriptions. | ||
| 304 | func (g *GlobalManager) StartPurger(interval time.Duration, stop <-chan struct{}) { | ||
| 305 | go func() { | ||
| 306 | ticker := time.NewTicker(interval) | ||
| 307 | defer ticker.Stop() | ||
| 308 | for { | ||
| 309 | select { | ||
| 310 | case <-ticker.C: | ||
| 311 | g.PurgeExpired() | ||
| 312 | case <-stop: | ||
| 313 | return | ||
| 314 | } | ||
| 315 | } | ||
| 316 | }() | ||
| 317 | } | ||
diff --git a/relay/websocket/websocket.go b/relay/websocket/websocket.go new file mode 100644 index 0000000..cfc3289 --- /dev/null +++ b/relay/websocket/websocket.go | |||
| @@ -0,0 +1,244 @@ | |||
| 1 | // Package websocket implements RFC 6455 WebSocket framing without external dependencies. | ||
| 2 | // Adapted from muxstr's websocket implementation. | ||
| 3 | package websocket | ||
| 4 | |||
| 5 | import ( | ||
| 6 | "bufio" | ||
| 7 | "context" | ||
| 8 | "crypto/rand" | ||
| 9 | "crypto/sha1" | ||
| 10 | "encoding/base64" | ||
| 11 | "encoding/binary" | ||
| 12 | "fmt" | ||
| 13 | "io" | ||
| 14 | "net" | ||
| 15 | "net/http" | ||
| 16 | "strings" | ||
| 17 | "sync" | ||
| 18 | "time" | ||
| 19 | ) | ||
| 20 | |||
| 21 | const ( | ||
| 22 | opContinuation = 0x0 | ||
| 23 | opBinary = 0x2 | ||
| 24 | opClose = 0x8 | ||
| 25 | opPing = 0x9 | ||
| 26 | opPong = 0xA | ||
| 27 | ) | ||
| 28 | |||
| 29 | // Conn is a WebSocket connection. | ||
| 30 | type Conn struct { | ||
| 31 | rwc net.Conn | ||
| 32 | br *bufio.Reader | ||
| 33 | client bool | ||
| 34 | mu sync.Mutex | ||
| 35 | } | ||
| 36 | |||
| 37 | func mask(key [4]byte, data []byte) { | ||
| 38 | for i := range data { | ||
| 39 | data[i] ^= key[i%4] | ||
| 40 | } | ||
| 41 | } | ||
| 42 | |||
| 43 | func (c *Conn) writeFrame(opcode byte, payload []byte) error { | ||
| 44 | c.mu.Lock() | ||
| 45 | defer c.mu.Unlock() | ||
| 46 | |||
| 47 | length := len(payload) | ||
| 48 | header := []byte{0x80 | opcode, 0} // FIN + opcode | ||
| 49 | |||
| 50 | if c.client { | ||
| 51 | header[1] = 0x80 // mask bit | ||
| 52 | } | ||
| 53 | |||
| 54 | switch { | ||
| 55 | case length <= 125: | ||
| 56 | header[1] |= byte(length) | ||
| 57 | case length <= 65535: | ||
| 58 | header[1] |= 126 | ||
| 59 | ext := make([]byte, 2) | ||
| 60 | binary.BigEndian.PutUint16(ext, uint16(length)) | ||
| 61 | header = append(header, ext...) | ||
| 62 | default: | ||
| 63 | header[1] |= 127 | ||
| 64 | ext := make([]byte, 8) | ||
| 65 | binary.BigEndian.PutUint64(ext, uint64(length)) | ||
| 66 | header = append(header, ext...) | ||
| 67 | } | ||
| 68 | |||
| 69 | if c.client { | ||
| 70 | var key [4]byte | ||
| 71 | rand.Read(key[:]) | ||
| 72 | header = append(header, key[:]...) | ||
| 73 | // mask a copy so we don't modify the caller's slice | ||
| 74 | masked := make([]byte, len(payload)) | ||
| 75 | copy(masked, payload) | ||
| 76 | mask(key, masked) | ||
| 77 | payload = masked | ||
| 78 | } | ||
| 79 | |||
| 80 | if _, err := c.rwc.Write(header); err != nil { | ||
| 81 | return err | ||
| 82 | } | ||
| 83 | _, err := c.rwc.Write(payload) | ||
| 84 | return err | ||
| 85 | } | ||
| 86 | |||
| 87 | func (c *Conn) readFrame() (fin bool, opcode byte, payload []byte, err error) { | ||
| 88 | var hdr [2]byte | ||
| 89 | if _, err = io.ReadFull(c.br, hdr[:]); err != nil { | ||
| 90 | return | ||
| 91 | } | ||
| 92 | |||
| 93 | fin = hdr[0]&0x80 != 0 | ||
| 94 | opcode = hdr[0] & 0x0F | ||
| 95 | masked := hdr[1]&0x80 != 0 | ||
| 96 | length := uint64(hdr[1] & 0x7F) | ||
| 97 | |||
| 98 | switch length { | ||
| 99 | case 126: | ||
| 100 | var ext [2]byte | ||
| 101 | if _, err = io.ReadFull(c.br, ext[:]); err != nil { | ||
| 102 | return | ||
| 103 | } | ||
| 104 | length = uint64(binary.BigEndian.Uint16(ext[:])) | ||
| 105 | case 127: | ||
| 106 | var ext [8]byte | ||
| 107 | if _, err = io.ReadFull(c.br, ext[:]); err != nil { | ||
| 108 | return | ||
| 109 | } | ||
| 110 | length = binary.BigEndian.Uint64(ext[:]) | ||
| 111 | } | ||
| 112 | |||
| 113 | var key [4]byte | ||
| 114 | if masked { | ||
| 115 | if _, err = io.ReadFull(c.br, key[:]); err != nil { | ||
| 116 | return | ||
| 117 | } | ||
| 118 | } | ||
| 119 | |||
| 120 | payload = make([]byte, length) | ||
| 121 | if _, err = io.ReadFull(c.br, payload); err != nil { | ||
| 122 | return | ||
| 123 | } | ||
| 124 | |||
| 125 | if masked { | ||
| 126 | mask(key, payload) | ||
| 127 | } | ||
| 128 | return | ||
| 129 | } | ||
| 130 | |||
| 131 | // Read reads the next complete message from the connection. | ||
| 132 | // It handles ping frames automatically by sending pong responses. | ||
| 133 | // It respects context cancellation by setting a read deadline. | ||
| 134 | func (c *Conn) Read(ctx context.Context) ([]byte, error) { | ||
| 135 | stop := context.AfterFunc(ctx, func() { | ||
| 136 | c.rwc.SetReadDeadline(time.Now()) | ||
| 137 | }) | ||
| 138 | defer stop() | ||
| 139 | |||
| 140 | var buf []byte | ||
| 141 | for { | ||
| 142 | fin, opcode, payload, err := c.readFrame() | ||
| 143 | if err != nil { | ||
| 144 | if ctx.Err() != nil { | ||
| 145 | return nil, ctx.Err() | ||
| 146 | } | ||
| 147 | return nil, err | ||
| 148 | } | ||
| 149 | |||
| 150 | switch opcode { | ||
| 151 | case opPing: | ||
| 152 | c.writeFrame(opPong, payload) | ||
| 153 | continue | ||
| 154 | case opClose: | ||
| 155 | return nil, fmt.Errorf("websocket: close frame received") | ||
| 156 | case opBinary, opContinuation: | ||
| 157 | buf = append(buf, payload...) | ||
| 158 | if fin { | ||
| 159 | return buf, nil | ||
| 160 | } | ||
| 161 | default: | ||
| 162 | // text or other opcodes — treat payload as binary | ||
| 163 | buf = append(buf, payload...) | ||
| 164 | if fin { | ||
| 165 | return buf, nil | ||
| 166 | } | ||
| 167 | } | ||
| 168 | } | ||
| 169 | } | ||
| 170 | |||
| 171 | // Write sends a binary frame to the connection. | ||
| 172 | func (c *Conn) Write(data []byte) error { | ||
| 173 | return c.writeFrame(opBinary, data) | ||
| 174 | } | ||
| 175 | |||
| 176 | // Ping sends a WebSocket ping frame. | ||
| 177 | func (c *Conn) Ping() error { | ||
| 178 | return c.writeFrame(opPing, nil) | ||
| 179 | } | ||
| 180 | |||
| 181 | // Close sends a close frame with the given code and reason, then closes the | ||
| 182 | // underlying connection. | ||
| 183 | func (c *Conn) Close(code uint16, reason string) error { | ||
| 184 | payload := make([]byte, 2+len(reason)) | ||
| 185 | binary.BigEndian.PutUint16(payload, code) | ||
| 186 | copy(payload[2:], reason) | ||
| 187 | c.writeFrame(opClose, payload) | ||
| 188 | return c.rwc.Close() | ||
| 189 | } | ||
| 190 | |||
| 191 | // CloseConn closes the underlying network connection without sending a close frame. | ||
| 192 | func (c *Conn) CloseConn() error { | ||
| 193 | return c.rwc.Close() | ||
| 194 | } | ||
| 195 | |||
| 196 | // SetReadDeadline sets the read deadline on the underlying connection. | ||
| 197 | func (c *Conn) SetReadDeadline(t time.Time) error { | ||
| 198 | return c.rwc.SetReadDeadline(t) | ||
| 199 | } | ||
| 200 | |||
| 201 | var wsGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" | ||
| 202 | |||
| 203 | func acceptKey(key string) string { | ||
| 204 | h := sha1.New() | ||
| 205 | h.Write([]byte(key)) | ||
| 206 | h.Write([]byte(wsGUID)) | ||
| 207 | return base64.StdEncoding.EncodeToString(h.Sum(nil)) | ||
| 208 | } | ||
| 209 | |||
| 210 | // Accept performs the server-side WebSocket handshake, hijacking the HTTP | ||
| 211 | // connection and returning a Conn ready for framed I/O. | ||
| 212 | func Accept(w http.ResponseWriter, r *http.Request) (*Conn, error) { | ||
| 213 | if !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { | ||
| 214 | return nil, fmt.Errorf("websocket: missing Upgrade header") | ||
| 215 | } | ||
| 216 | |||
| 217 | key := r.Header.Get("Sec-WebSocket-Key") | ||
| 218 | if key == "" { | ||
| 219 | return nil, fmt.Errorf("websocket: missing Sec-WebSocket-Key") | ||
| 220 | } | ||
| 221 | |||
| 222 | hj, ok := w.(http.Hijacker) | ||
| 223 | if !ok { | ||
| 224 | return nil, fmt.Errorf("websocket: response does not support hijacking") | ||
| 225 | } | ||
| 226 | |||
| 227 | rwc, brw, err := hj.Hijack() | ||
| 228 | if err != nil { | ||
| 229 | return nil, err | ||
| 230 | } | ||
| 231 | |||
| 232 | accept := acceptKey(key) | ||
| 233 | respStr := "HTTP/1.1 101 Switching Protocols\r\n" + | ||
| 234 | "Upgrade: websocket\r\n" + | ||
| 235 | "Connection: Upgrade\r\n" + | ||
| 236 | "Sec-WebSocket-Accept: " + accept + "\r\n\r\n" | ||
| 237 | |||
| 238 | if _, err := rwc.Write([]byte(respStr)); err != nil { | ||
| 239 | rwc.Close() | ||
| 240 | return nil, err | ||
| 241 | } | ||
| 242 | |||
| 243 | return &Conn{rwc: rwc, br: brw.Reader, client: false}, nil | ||
| 244 | } | ||
