From 61a85baf87d89fcc09f9469a113a2ddc982b0a24 Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 08:01:02 -0700 Subject: feat: phase 2 relay implementation Implement the Axon relay as relay/ (module axon/relay). Includes: - WebSocket framing (RFC 6455, no external deps) in relay/websocket/ - Per-connection auth: challenge/response with ed25519 + allowlist check - Ingest pipeline: sig verify, dedup, ephemeral fanout, SQLite persistence - Subscription manager with prefix-matching filter fanout in relay/subscription/ - SQLite storage with WAL/cache config and UNION query builder in relay/storage/ - Graceful shutdown on SIGINT/SIGTERM - Filter/TagFilter types added to axon core package (required by relay) --- relay/main.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 relay/main.go (limited to 'relay/main.go') diff --git a/relay/main.go b/relay/main.go new file mode 100644 index 0000000..2cfa034 --- /dev/null +++ b/relay/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "context" + "errors" + "flag" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "axon/relay/storage" + "axon/relay/subscription" +) + +func main() { + cfgPath := flag.String("config", "config.yaml", "path to config.yaml") + flag.Parse() + + cfg, err := LoadConfig(*cfgPath) + if err != nil { + log.Fatalf("relay: load config: %v", err) + } + + allowlist, err := cfg.AllowlistBytes() + if err != nil { + log.Fatalf("relay: allowlist: %v", err) + } + + store, err := storage.New(cfg.DB) + if err != nil { + log.Fatalf("relay: open storage: %v", err) + } + defer store.Close() + + global := subscription.NewGlobalManager() + + // Periodically purge closed subscriptions. + stopPurger := make(chan struct{}) + global.StartPurger(5*time.Minute, stopPurger) + defer close(stopPurger) + + srv := NewServer(cfg, allowlist, store, global) + + // Graceful shutdown on SIGINT / SIGTERM. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + errCh := make(chan error, 1) + go func() { + if err := srv.Start(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- err + } + }() + + select { + case sig := <-sigCh: + log.Printf("relay: received signal %s, shutting down", sig) + case err := <-errCh: + log.Fatalf("relay: server error: %v", err) + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Printf("relay: shutdown error: %v", err) + } + log.Println("relay: stopped") +} -- cgit v1.2.3