aboutsummaryrefslogtreecommitdiffstats
path: root/relay/websocket
diff options
context:
space:
mode:
authorbndw <ben@bdw.to>2026-03-09 08:01:02 -0700
committerbndw <ben@bdw.to>2026-03-09 08:01:02 -0700
commit61a85baf87d89fcc09f9469a113a2ddc982b0a24 (patch)
treed8359ce5cbcbb9402ba92c617c4ebd702adf33e9 /relay/websocket
parentce684848e25fed3aabdde4ffba6d2d8c40afa030 (diff)
feat: phase 2 relay implementation
Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay)
Diffstat (limited to 'relay/websocket')
-rw-r--r--relay/websocket/websocket.go244
1 files changed, 244 insertions, 0 deletions
diff --git a/relay/websocket/websocket.go b/relay/websocket/websocket.go
new file mode 100644
index 0000000..cfc3289
--- /dev/null
+++ b/relay/websocket/websocket.go
@@ -0,0 +1,244 @@
1// Package websocket implements RFC 6455 WebSocket framing without external dependencies.
2// Adapted from muxstr's websocket implementation.
3package websocket
4
5import (
6 "bufio"
7 "context"
8 "crypto/rand"
9 "crypto/sha1"
10 "encoding/base64"
11 "encoding/binary"
12 "fmt"
13 "io"
14 "net"
15 "net/http"
16 "strings"
17 "sync"
18 "time"
19)
20
21const (
22 opContinuation = 0x0
23 opBinary = 0x2
24 opClose = 0x8
25 opPing = 0x9
26 opPong = 0xA
27)
28
29// Conn is a WebSocket connection.
30type Conn struct {
31 rwc net.Conn
32 br *bufio.Reader
33 client bool
34 mu sync.Mutex
35}
36
37func mask(key [4]byte, data []byte) {
38 for i := range data {
39 data[i] ^= key[i%4]
40 }
41}
42
43func (c *Conn) writeFrame(opcode byte, payload []byte) error {
44 c.mu.Lock()
45 defer c.mu.Unlock()
46
47 length := len(payload)
48 header := []byte{0x80 | opcode, 0} // FIN + opcode
49
50 if c.client {
51 header[1] = 0x80 // mask bit
52 }
53
54 switch {
55 case length <= 125:
56 header[1] |= byte(length)
57 case length <= 65535:
58 header[1] |= 126
59 ext := make([]byte, 2)
60 binary.BigEndian.PutUint16(ext, uint16(length))
61 header = append(header, ext...)
62 default:
63 header[1] |= 127
64 ext := make([]byte, 8)
65 binary.BigEndian.PutUint64(ext, uint64(length))
66 header = append(header, ext...)
67 }
68
69 if c.client {
70 var key [4]byte
71 rand.Read(key[:])
72 header = append(header, key[:]...)
73 // mask a copy so we don't modify the caller's slice
74 masked := make([]byte, len(payload))
75 copy(masked, payload)
76 mask(key, masked)
77 payload = masked
78 }
79
80 if _, err := c.rwc.Write(header); err != nil {
81 return err
82 }
83 _, err := c.rwc.Write(payload)
84 return err
85}
86
87func (c *Conn) readFrame() (fin bool, opcode byte, payload []byte, err error) {
88 var hdr [2]byte
89 if _, err = io.ReadFull(c.br, hdr[:]); err != nil {
90 return
91 }
92
93 fin = hdr[0]&0x80 != 0
94 opcode = hdr[0] & 0x0F
95 masked := hdr[1]&0x80 != 0
96 length := uint64(hdr[1] & 0x7F)
97
98 switch length {
99 case 126:
100 var ext [2]byte
101 if _, err = io.ReadFull(c.br, ext[:]); err != nil {
102 return
103 }
104 length = uint64(binary.BigEndian.Uint16(ext[:]))
105 case 127:
106 var ext [8]byte
107 if _, err = io.ReadFull(c.br, ext[:]); err != nil {
108 return
109 }
110 length = binary.BigEndian.Uint64(ext[:])
111 }
112
113 var key [4]byte
114 if masked {
115 if _, err = io.ReadFull(c.br, key[:]); err != nil {
116 return
117 }
118 }
119
120 payload = make([]byte, length)
121 if _, err = io.ReadFull(c.br, payload); err != nil {
122 return
123 }
124
125 if masked {
126 mask(key, payload)
127 }
128 return
129}
130
131// Read reads the next complete message from the connection.
132// It handles ping frames automatically by sending pong responses.
133// It respects context cancellation by setting a read deadline.
134func (c *Conn) Read(ctx context.Context) ([]byte, error) {
135 stop := context.AfterFunc(ctx, func() {
136 c.rwc.SetReadDeadline(time.Now())
137 })
138 defer stop()
139
140 var buf []byte
141 for {
142 fin, opcode, payload, err := c.readFrame()
143 if err != nil {
144 if ctx.Err() != nil {
145 return nil, ctx.Err()
146 }
147 return nil, err
148 }
149
150 switch opcode {
151 case opPing:
152 c.writeFrame(opPong, payload)
153 continue
154 case opClose:
155 return nil, fmt.Errorf("websocket: close frame received")
156 case opBinary, opContinuation:
157 buf = append(buf, payload...)
158 if fin {
159 return buf, nil
160 }
161 default:
162 // text or other opcodes — treat payload as binary
163 buf = append(buf, payload...)
164 if fin {
165 return buf, nil
166 }
167 }
168 }
169}
170
171// Write sends a binary frame to the connection.
172func (c *Conn) Write(data []byte) error {
173 return c.writeFrame(opBinary, data)
174}
175
176// Ping sends a WebSocket ping frame.
177func (c *Conn) Ping() error {
178 return c.writeFrame(opPing, nil)
179}
180
181// Close sends a close frame with the given code and reason, then closes the
182// underlying connection.
183func (c *Conn) Close(code uint16, reason string) error {
184 payload := make([]byte, 2+len(reason))
185 binary.BigEndian.PutUint16(payload, code)
186 copy(payload[2:], reason)
187 c.writeFrame(opClose, payload)
188 return c.rwc.Close()
189}
190
191// CloseConn closes the underlying network connection without sending a close frame.
192func (c *Conn) CloseConn() error {
193 return c.rwc.Close()
194}
195
196// SetReadDeadline sets the read deadline on the underlying connection.
197func (c *Conn) SetReadDeadline(t time.Time) error {
198 return c.rwc.SetReadDeadline(t)
199}
200
201var wsGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
202
203func acceptKey(key string) string {
204 h := sha1.New()
205 h.Write([]byte(key))
206 h.Write([]byte(wsGUID))
207 return base64.StdEncoding.EncodeToString(h.Sum(nil))
208}
209
210// Accept performs the server-side WebSocket handshake, hijacking the HTTP
211// connection and returning a Conn ready for framed I/O.
212func Accept(w http.ResponseWriter, r *http.Request) (*Conn, error) {
213 if !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
214 return nil, fmt.Errorf("websocket: missing Upgrade header")
215 }
216
217 key := r.Header.Get("Sec-WebSocket-Key")
218 if key == "" {
219 return nil, fmt.Errorf("websocket: missing Sec-WebSocket-Key")
220 }
221
222 hj, ok := w.(http.Hijacker)
223 if !ok {
224 return nil, fmt.Errorf("websocket: response does not support hijacking")
225 }
226
227 rwc, brw, err := hj.Hijack()
228 if err != nil {
229 return nil, err
230 }
231
232 accept := acceptKey(key)
233 respStr := "HTTP/1.1 101 Switching Protocols\r\n" +
234 "Upgrade: websocket\r\n" +
235 "Connection: Upgrade\r\n" +
236 "Sec-WebSocket-Accept: " + accept + "\r\n\r\n"
237
238 if _, err := rwc.Write([]byte(respStr)); err != nil {
239 rwc.Close()
240 return nil, err
241 }
242
243 return &Conn{rwc: rwc, br: brw.Reader, client: false}, nil
244}