aboutsummaryrefslogtreecommitdiffstats
path: root/relay/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'relay/server.go')
-rw-r--r--relay/server.go118
1 files changed, 118 insertions, 0 deletions
diff --git a/relay/server.go b/relay/server.go
new file mode 100644
index 0000000..085929c
--- /dev/null
+++ b/relay/server.go
@@ -0,0 +1,118 @@
1package main
2
3import (
4 "context"
5 "crypto/rand"
6 "encoding/hex"
7 "fmt"
8 "log"
9 "net/http"
10 "sync"
11
12 "axon/relay/storage"
13 "axon/relay/subscription"
14 ws "axon/relay/websocket"
15)
16
17// Server is the HTTP + WebSocket server for the Axon relay.
18type Server struct {
19 cfg Config
20 allowlist [][]byte
21 store *storage.Storage
22 global *subscription.GlobalManager
23
24 mu sync.WaitGroup
25 httpSrv *http.Server
26}
27
28// NewServer creates a Server from the given config.
29func NewServer(cfg Config, allowlist [][]byte, store *storage.Storage, global *subscription.GlobalManager) *Server {
30 return &Server{
31 cfg: cfg,
32 allowlist: allowlist,
33 store: store,
34 global: global,
35 }
36}
37
38// Start configures the HTTP server and starts listening. Call Shutdown to stop.
39func (s *Server) Start() error {
40 mux := http.NewServeMux()
41 mux.HandleFunc("/", s.handleWS)
42
43 s.httpSrv = &http.Server{
44 Addr: s.cfg.Addr,
45 Handler: mux,
46 }
47
48 log.Printf("relay: listening on %s", s.cfg.Addr)
49 return s.httpSrv.ListenAndServe()
50}
51
52// Shutdown gracefully stops the server and waits for all connections to drain.
53func (s *Server) Shutdown(ctx context.Context) error {
54 err := s.httpSrv.Shutdown(ctx)
55 // Wait for all handler goroutines to finish.
56 done := make(chan struct{})
57 go func() {
58 s.mu.Wait()
59 close(done)
60 }()
61 select {
62 case <-done:
63 case <-ctx.Done():
64 }
65 return err
66}
67
68// handleWS upgrades an HTTP request to a WebSocket connection and starts the
69// per-connection handler goroutine.
70func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) {
71 c, err := ws.Accept(w, r)
72 if err != nil {
73 http.Error(w, "WebSocket upgrade failed", http.StatusBadRequest)
74 return
75 }
76
77 // Generate 32-byte nonce for the auth challenge.
78 nonce := make([]byte, 32)
79 if _, err := rand.Read(nonce); err != nil {
80 log.Printf("relay: generate nonce: %v", err)
81 c.CloseConn()
82 return
83 }
84
85 connID := hex.EncodeToString(nonce[:8])
86
87 h := &conn{
88 id: connID,
89 ws: c,
90 store: s.store,
91 global: s.global,
92 allowlist: s.allowlist,
93 relayURL: s.cfg.RelayURL,
94 nonce: nonce,
95 mgr: subscription.NewManager(),
96 }
97
98 s.mu.Add(1)
99 go func() {
100 defer s.mu.Done()
101 ctx := r.Context()
102 h.serve(ctx)
103 if err := c.CloseConn(); err != nil {
104 // Ignore close errors — connection may already be gone.
105 _ = err
106 }
107 log.Printf("conn %s: closed", connID)
108 }()
109}
110
111// generateConnID creates a unique connection identifier for logging.
112func generateConnID() (string, error) {
113 var b [8]byte
114 if _, err := rand.Read(b[:]); err != nil {
115 return "", fmt.Errorf("server: generate conn id: %w", err)
116 }
117 return hex.EncodeToString(b[:]), nil
118}