diff options
| author | bndw <ben@bdw.to> | 2026-03-09 08:01:02 -0700 |
|---|---|---|
| committer | bndw <ben@bdw.to> | 2026-03-09 08:01:02 -0700 |
| commit | 61a85baf87d89fcc09f9469a113a2ddc982b0a24 (patch) | |
| tree | d8359ce5cbcbb9402ba92c617c4ebd702adf33e9 /relay/server.go | |
| parent | ce684848e25fed3aabdde4ffba6d2d8c40afa030 (diff) | |
feat: phase 2 relay implementation
Implement the Axon relay as relay/ (module axon/relay). Includes:
- WebSocket framing (RFC 6455, no external deps) in relay/websocket/
- Per-connection auth: challenge/response with ed25519 + allowlist check
- Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence
- Subscription manager with prefix-matching filter fanout in relay/subscription/
- SQLite storage with WAL/cache config and UNION query builder in relay/storage/
- Graceful shutdown on SIGINT/SIGTERM
- Filter/TagFilter types added to axon core package (required by relay)
Diffstat (limited to 'relay/server.go')
| -rw-r--r-- | relay/server.go | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/relay/server.go b/relay/server.go new file mode 100644 index 0000000..085929c --- /dev/null +++ b/relay/server.go | |||
| @@ -0,0 +1,118 @@ | |||
| 1 | package main | ||
| 2 | |||
| 3 | import ( | ||
| 4 | "context" | ||
| 5 | "crypto/rand" | ||
| 6 | "encoding/hex" | ||
| 7 | "fmt" | ||
| 8 | "log" | ||
| 9 | "net/http" | ||
| 10 | "sync" | ||
| 11 | |||
| 12 | "axon/relay/storage" | ||
| 13 | "axon/relay/subscription" | ||
| 14 | ws "axon/relay/websocket" | ||
| 15 | ) | ||
| 16 | |||
| 17 | // Server is the HTTP + WebSocket server for the Axon relay. | ||
| 18 | type Server struct { | ||
| 19 | cfg Config | ||
| 20 | allowlist [][]byte | ||
| 21 | store *storage.Storage | ||
| 22 | global *subscription.GlobalManager | ||
| 23 | |||
| 24 | mu sync.WaitGroup | ||
| 25 | httpSrv *http.Server | ||
| 26 | } | ||
| 27 | |||
| 28 | // NewServer creates a Server from the given config. | ||
| 29 | func NewServer(cfg Config, allowlist [][]byte, store *storage.Storage, global *subscription.GlobalManager) *Server { | ||
| 30 | return &Server{ | ||
| 31 | cfg: cfg, | ||
| 32 | allowlist: allowlist, | ||
| 33 | store: store, | ||
| 34 | global: global, | ||
| 35 | } | ||
| 36 | } | ||
| 37 | |||
| 38 | // Start configures the HTTP server and starts listening. Call Shutdown to stop. | ||
| 39 | func (s *Server) Start() error { | ||
| 40 | mux := http.NewServeMux() | ||
| 41 | mux.HandleFunc("/", s.handleWS) | ||
| 42 | |||
| 43 | s.httpSrv = &http.Server{ | ||
| 44 | Addr: s.cfg.Addr, | ||
| 45 | Handler: mux, | ||
| 46 | } | ||
| 47 | |||
| 48 | log.Printf("relay: listening on %s", s.cfg.Addr) | ||
| 49 | return s.httpSrv.ListenAndServe() | ||
| 50 | } | ||
| 51 | |||
| 52 | // Shutdown gracefully stops the server and waits for all connections to drain. | ||
| 53 | func (s *Server) Shutdown(ctx context.Context) error { | ||
| 54 | err := s.httpSrv.Shutdown(ctx) | ||
| 55 | // Wait for all handler goroutines to finish. | ||
| 56 | done := make(chan struct{}) | ||
| 57 | go func() { | ||
| 58 | s.mu.Wait() | ||
| 59 | close(done) | ||
| 60 | }() | ||
| 61 | select { | ||
| 62 | case <-done: | ||
| 63 | case <-ctx.Done(): | ||
| 64 | } | ||
| 65 | return err | ||
| 66 | } | ||
| 67 | |||
| 68 | // handleWS upgrades an HTTP request to a WebSocket connection and starts the | ||
| 69 | // per-connection handler goroutine. | ||
| 70 | func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) { | ||
| 71 | c, err := ws.Accept(w, r) | ||
| 72 | if err != nil { | ||
| 73 | http.Error(w, "WebSocket upgrade failed", http.StatusBadRequest) | ||
| 74 | return | ||
| 75 | } | ||
| 76 | |||
| 77 | // Generate 32-byte nonce for the auth challenge. | ||
| 78 | nonce := make([]byte, 32) | ||
| 79 | if _, err := rand.Read(nonce); err != nil { | ||
| 80 | log.Printf("relay: generate nonce: %v", err) | ||
| 81 | c.CloseConn() | ||
| 82 | return | ||
| 83 | } | ||
| 84 | |||
| 85 | connID := hex.EncodeToString(nonce[:8]) | ||
| 86 | |||
| 87 | h := &conn{ | ||
| 88 | id: connID, | ||
| 89 | ws: c, | ||
| 90 | store: s.store, | ||
| 91 | global: s.global, | ||
| 92 | allowlist: s.allowlist, | ||
| 93 | relayURL: s.cfg.RelayURL, | ||
| 94 | nonce: nonce, | ||
| 95 | mgr: subscription.NewManager(), | ||
| 96 | } | ||
| 97 | |||
| 98 | s.mu.Add(1) | ||
| 99 | go func() { | ||
| 100 | defer s.mu.Done() | ||
| 101 | ctx := r.Context() | ||
| 102 | h.serve(ctx) | ||
| 103 | if err := c.CloseConn(); err != nil { | ||
| 104 | // Ignore close errors — connection may already be gone. | ||
| 105 | _ = err | ||
| 106 | } | ||
| 107 | log.Printf("conn %s: closed", connID) | ||
| 108 | }() | ||
| 109 | } | ||
| 110 | |||
| 111 | // generateConnID creates a unique connection identifier for logging. | ||
| 112 | func generateConnID() (string, error) { | ||
| 113 | var b [8]byte | ||
| 114 | if _, err := rand.Read(b[:]); err != nil { | ||
| 115 | return "", fmt.Errorf("server: generate conn id: %w", err) | ||
| 116 | } | ||
| 117 | return hex.EncodeToString(b[:]), nil | ||
| 118 | } | ||
