From 0bd392e076e36c80a152abe00cbcd0bc9efedd9c Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 12:48:39 -0700 Subject: feat: add axon CLI for pub/sub; fix relay hijack context bug - cmd/axon: new CLI module with keygen, pub, and req subcommands - keygen: generate Ed25519 keypair, print hex seed and pubkey - pub: sign and publish an event; accepts --kind, --content, --tag - req: query/stream events as JSON lines; accepts --kind, --author, --tag, --since, --until, --limit, --stream - key loaded from --key flag or AXON_KEY env var - relay/websocket: add Dial() for client-side WebSocket handshake (ws:// and wss://, RFC 6455 masking via client:true) - relay/server: fix broken-pipe on auth by switching hijacked conn goroutine from r.Context() to context.Background(); r.Context() is cancelled by net/http immediately after Hijack is called --- cmd/axon/cli | Bin 0 -> 7995047 bytes cmd/axon/go.mod | 20 ++ cmd/axon/go.sum | 16 ++ cmd/axon/main.go | 483 +++++++++++++++++++++++++++++++++++++++++++ go.work | 1 + go.work.sum | 4 + relay/server.go | 5 +- relay/websocket/websocket.go | 75 +++++++ 8 files changed, 603 insertions(+), 1 deletion(-) create mode 100755 cmd/axon/cli create mode 100644 cmd/axon/go.mod create mode 100644 cmd/axon/go.sum create mode 100644 cmd/axon/main.go create mode 100644 go.work.sum diff --git a/cmd/axon/cli b/cmd/axon/cli new file mode 100755 index 0000000..93f7706 Binary files /dev/null and b/cmd/axon/cli differ diff --git a/cmd/axon/go.mod b/cmd/axon/go.mod new file mode 100644 index 0000000..2f26fca --- /dev/null +++ b/cmd/axon/go.mod @@ -0,0 +1,20 @@ +module axon/cli + +go 1.25.5 + +require ( + axon v0.0.0 + axon/relay v0.0.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 +) + +require ( + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/sys v0.41.0 // indirect +) + +replace ( + axon => ../../ + axon/relay => ../../relay +) diff --git a/cmd/axon/go.sum b/cmd/axon/go.sum new file mode 100644 index 0000000..7f2743c --- /dev/null +++ b/cmd/axon/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/cmd/axon/main.go b/cmd/axon/main.go new file mode 100644 index 0000000..f7aaf63 --- /dev/null +++ b/cmd/axon/main.go @@ -0,0 +1,483 @@ +// axon is a CLI for publishing and querying events on an Axon relay. +// +// Usage: +// +// axon keygen +// axon pub [flags] +// axon req [flags] +// +// The private key seed is read from --key or the AXON_KEY environment variable. +// All binary values (IDs, pubkeys, signatures) are printed as lowercase hex. +package main + +import ( + "context" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "axon" + ws "axon/relay/websocket" + + "github.com/vmihailenco/msgpack/v5" +) + +// ── Wire protocol constants (mirrors relay/handler.go) ────────────────────── + +const ( + msgTypeAuth uint16 = 1 + msgTypeSubscribe uint16 = 2 + msgTypeUnsubscribe uint16 = 3 + msgTypePublish uint16 = 4 + + msgTypeChallenge uint16 = 10 + msgTypeEvent uint16 = 11 + msgTypeEose uint16 = 12 + msgTypeOk uint16 = 13 + msgTypeError uint16 = 14 +) + +// ── Payload structs ────────────────────────────────────────────────────────── + +type challengePayload struct { + Nonce []byte `msgpack:"nonce"` +} + +type authPayload struct { + PubKey []byte `msgpack:"pubkey"` + Sig []byte `msgpack:"sig"` +} + +type okPayload struct { + Message string `msgpack:"message"` +} + +type errorPayload struct { + Code uint16 `msgpack:"code"` + Message string `msgpack:"message"` +} + +type publishPayload struct { + Event axon.Event `msgpack:"event"` +} + +type subscribePayload struct { + SubID string `msgpack:"sub_id"` + Filter axon.Filter `msgpack:"filter"` +} + +type eventPayload struct { + SubID string `msgpack:"sub_id"` + Event axon.Event `msgpack:"event"` +} + +type eosePayload struct { + SubID string `msgpack:"sub_id"` +} + +// ── Transport helpers ──────────────────────────────────────────────────────── + +func send(conn *ws.Conn, msgType uint16, payload interface{}) error { + b, err := msgpack.Marshal([]interface{}{msgType, payload}) + if err != nil { + return err + } + return conn.Write(b) +} + +func recv(conn *ws.Conn, ctx context.Context) (uint16, msgpack.RawMessage, error) { + data, err := conn.Read(ctx) + if err != nil { + return 0, nil, err + } + var arr []msgpack.RawMessage + if err := msgpack.Unmarshal(data, &arr); err != nil { + return 0, nil, fmt.Errorf("decode message: %w", err) + } + if len(arr) < 2 { + return 0, nil, fmt.Errorf("message too short (%d elements)", len(arr)) + } + var t uint16 + if err := msgpack.Unmarshal(arr[0], &t); err != nil { + return 0, nil, fmt.Errorf("decode message type: %w", err) + } + return t, arr[1], nil +} + +// dial connects and performs the auth handshake. Returns an authenticated conn. +func dial(relayURL string, kp axon.KeyPair) (*ws.Conn, error) { + conn, err := ws.Dial(relayURL) + if err != nil { + return nil, err + } + + ctx := context.Background() + + // Receive Challenge. + t, raw, err := recv(conn, ctx) + if err != nil { + conn.CloseConn() + return nil, fmt.Errorf("recv challenge: %w", err) + } + if t != msgTypeChallenge { + conn.CloseConn() + return nil, fmt.Errorf("expected challenge (10), got %d", t) + } + var cp challengePayload + if err := msgpack.Unmarshal(raw, &cp); err != nil { + conn.CloseConn() + return nil, fmt.Errorf("decode challenge: %w", err) + } + + // Send Auth. + sig := axon.SignChallenge(kp, cp.Nonce, relayURL) + if err := send(conn, msgTypeAuth, authPayload{ + PubKey: []byte(kp.PubKey), + Sig: sig, + }); err != nil { + conn.CloseConn() + return nil, fmt.Errorf("send auth: %w", err) + } + + // Receive Ok / Error. + t, raw, err = recv(conn, ctx) + if err != nil { + conn.CloseConn() + return nil, fmt.Errorf("recv auth response: %w", err) + } + switch t { + case msgTypeOk: + return conn, nil + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + conn.CloseConn() + return nil, fmt.Errorf("auth rejected (%d): %s", ep.Code, ep.Message) + default: + conn.CloseConn() + return nil, fmt.Errorf("unexpected message %d after auth", t) + } +} + +// ── Output helpers ─────────────────────────────────────────────────────────── + +// eventJSON is a JSON-serialisable view of an axon.Event with hex-encoded +// binary fields and content treated as a UTF-8 string. +type eventJSON struct { + ID string `json:"id"` + PubKey string `json:"pubkey"` + CreatedAt int64 `json:"created_at"` + Kind uint16 `json:"kind"` + Content string `json:"content"` + Tags []axon.Tag `json:"tags"` + Sig string `json:"sig"` +} + +func toEventJSON(e *axon.Event) eventJSON { + return eventJSON{ + ID: hex.EncodeToString(e.ID), + PubKey: hex.EncodeToString(e.PubKey), + CreatedAt: e.CreatedAt, + Kind: e.Kind, + Content: string(e.Content), + Tags: e.Tags, + Sig: hex.EncodeToString(e.Sig), + } +} + +func printEvent(e *axon.Event) { + b, _ := json.Marshal(toEventJSON(e)) + fmt.Println(string(b)) +} + +// ── Custom flag types ──────────────────────────────────────────────────────── + +// tagFlag accumulates --tag name=value or --tag name=v1,v2 flags. +type tagFlag []axon.Tag + +func (f *tagFlag) String() string { return fmt.Sprint([]axon.Tag(*f)) } +func (f *tagFlag) Set(s string) error { + parts := strings.SplitN(s, "=", 2) + if len(parts) != 2 || parts[0] == "" { + return fmt.Errorf("expected name=value, got %q", s) + } + values := strings.Split(parts[1], ",") + *f = append(*f, axon.Tag{Name: parts[0], Values: values}) + return nil +} + +// kindFlag accumulates --kind flags as uint16. +type kindFlag []uint16 + +func (f *kindFlag) String() string { return fmt.Sprint([]uint16(*f)) } +func (f *kindFlag) Set(s string) error { + v, err := strconv.ParseUint(s, 10, 16) + if err != nil { + return err + } + *f = append(*f, uint16(v)) + return nil +} + +// authorFlag accumulates --author hex-pubkey flags. +type authorFlag [][]byte + +func (f *authorFlag) String() string { return fmt.Sprint([][]byte(*f)) } +func (f *authorFlag) Set(s string) error { + b, err := hex.DecodeString(s) + if err != nil { + return fmt.Errorf("not valid hex: %w", err) + } + if len(b) != 32 { + return fmt.Errorf("pubkey must be 32 bytes (64 hex chars), got %d", len(b)) + } + *f = append(*f, b) + return nil +} + +// ── Key loading ────────────────────────────────────────────────────────────── + +func loadKey(hexSeed string) (axon.KeyPair, error) { + if hexSeed == "" { + hexSeed = os.Getenv("AXON_KEY") + } + if hexSeed == "" { + return axon.KeyPair{}, fmt.Errorf("no key: supply --key or set AXON_KEY") + } + seed, err := hex.DecodeString(hexSeed) + if err != nil { + return axon.KeyPair{}, fmt.Errorf("decode key: %w", err) + } + if len(seed) != 32 { + return axon.KeyPair{}, fmt.Errorf("key must be 32 bytes (64 hex chars), got %d", len(seed)) + } + return axon.NewKeyPairFromSeed(seed), nil +} + +// ── keygen ─────────────────────────────────────────────────────────────────── + +func cmdKeygen(_ []string) { + kp, err := axon.NewKeyPair() + if err != nil { + log.Fatalf("keygen: %v", err) + } + fmt.Printf("private-key %s\n", hex.EncodeToString(kp.PrivKey.Seed())) + fmt.Printf("public-key %s\n", hex.EncodeToString(kp.PubKey)) +} + +// ── pub ────────────────────────────────────────────────────────────────────── + +func cmdPub(args []string) { + fs := flag.NewFlagSet("pub", flag.ExitOnError) + fs.Usage = func() { + fmt.Fprintln(os.Stderr, "usage: axon pub [flags] ") + fmt.Fprintln(os.Stderr, "\nFlags:") + fs.PrintDefaults() + } + keyHex := fs.String("key", "", "private key seed (hex, 32 bytes). Falls back to AXON_KEY env var.") + kind := fs.Uint("kind", 1000, "event kind") + content := fs.String("content", "", "event content (string)") + var tags tagFlag + fs.Var(&tags, "tag", "add a tag: name=value or name=v1,v2 (repeatable)") + _ = fs.Parse(args) + + relayURL := fs.Arg(0) + if relayURL == "" { + fs.Usage() + os.Exit(1) + } + + kp, err := loadKey(*keyHex) + if err != nil { + log.Fatal(err) + } + + event := axon.Event{ + CreatedAt: time.Now().Unix(), + Kind: uint16(*kind), + Content: []byte(*content), + Tags: []axon.Tag(tags), + } + if event.Tags == nil { + event.Tags = []axon.Tag{} + } + if err := axon.Sign(&event, kp); err != nil { + log.Fatalf("sign: %v", err) + } + + conn, err := dial(relayURL, kp) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer conn.CloseConn() + + if err := send(conn, msgTypePublish, publishPayload{Event: event}); err != nil { + log.Fatalf("publish: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + t, raw, err := recv(conn, ctx) + if err != nil { + log.Fatalf("recv: %v", err) + } + switch t { + case msgTypeOk: + fmt.Printf("published %s\n", hex.EncodeToString(event.ID)) + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + log.Fatalf("error %d: %s", ep.Code, ep.Message) + default: + log.Fatalf("unexpected message type %d", t) + } +} + +// ── req ────────────────────────────────────────────────────────────────────── + +func cmdReq(args []string) { + fs := flag.NewFlagSet("req", flag.ExitOnError) + fs.Usage = func() { + fmt.Fprintln(os.Stderr, "usage: axon req [flags] ") + fmt.Fprintln(os.Stderr, "\nPrints one JSON event per line. Exits after EOSE unless --stream is set.") + fmt.Fprintln(os.Stderr, "\nFlags:") + fs.PrintDefaults() + } + keyHex := fs.String("key", "", "private key seed (hex). Falls back to AXON_KEY env var.") + var kinds kindFlag + fs.Var(&kinds, "kind", "filter by event kind (repeatable)") + var authors authorFlag + fs.Var(&authors, "author", "filter by author pubkey hex (repeatable)") + var filterTags tagFlag + fs.Var(&filterTags, "tag", "filter by tag: name=value (repeatable)") + since := fs.Int64("since", 0, "only events with created_at >= this unix timestamp") + until := fs.Int64("until", 0, "only events with created_at <= this unix timestamp") + limit := fs.Int("limit", 0, "max events to return (0 = no limit)") + stream := fs.Bool("stream", false, "keep streaming live events after EOSE (Ctrl-C to exit)") + _ = fs.Parse(args) + + relayURL := fs.Arg(0) + if relayURL == "" { + fs.Usage() + os.Exit(1) + } + + kp, err := loadKey(*keyHex) + if err != nil { + log.Fatal(err) + } + + filter := axon.Filter{ + Kinds: []uint16(kinds), + Authors: [][]byte(authors), + Since: *since, + Until: *until, + Limit: int32(*limit), + } + for _, t := range filterTags { + filter.Tags = append(filter.Tags, axon.TagFilter{Name: t.Name, Values: t.Values}) + } + + conn, err := dial(relayURL, kp) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer conn.CloseConn() + + subID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 36) + if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { + log.Fatalf("subscribe: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Cancel on Ctrl-C when streaming. + if *stream { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + cancel() + }() + } + + for { + t, raw, err := recv(conn, ctx) + if err != nil { + if ctx.Err() != nil { + return // clean cancellation + } + log.Fatalf("recv: %v", err) + } + switch t { + case msgTypeEvent: + var ep eventPayload + if err := msgpack.Unmarshal(raw, &ep); err != nil { + log.Printf("decode event: %v", err) + continue + } + printEvent(&ep.Event) + + case msgTypeEose: + if !*stream { + return + } + // Keep looping for live events. + + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + log.Fatalf("error %d: %s", ep.Code, ep.Message) + + default: + log.Printf("unexpected message type %d", t) + } + } +} + +// ── main ───────────────────────────────────────────────────────────────────── + +func usage() { + fmt.Fprintln(os.Stderr, "usage: axon [flags]") + fmt.Fprintln(os.Stderr, "") + fmt.Fprintln(os.Stderr, "Commands:") + fmt.Fprintln(os.Stderr, " keygen Generate a new Ed25519 keypair") + fmt.Fprintln(os.Stderr, " pub Publish an event to a relay") + fmt.Fprintln(os.Stderr, " req Query or stream events from a relay") + fmt.Fprintln(os.Stderr, "") + fmt.Fprintln(os.Stderr, "Run 'axon -h' for command-specific help.") +} + +func main() { + log.SetFlags(0) + log.SetPrefix("axon: ") + + if len(os.Args) < 2 { + usage() + os.Exit(1) + } + + cmd, rest := os.Args[1], os.Args[2:] + switch cmd { + case "keygen": + cmdKeygen(rest) + case "pub": + cmdPub(rest) + case "req": + cmdReq(rest) + default: + fmt.Fprintf(os.Stderr, "axon: unknown command %q\n\n", cmd) + usage() + os.Exit(1) + } +} diff --git a/go.work b/go.work index 47a23cd..09ca901 100644 --- a/go.work +++ b/go.work @@ -3,4 +3,5 @@ go 1.25.5 use ( . ./relay + ./cmd/axon ) diff --git a/go.work.sum b/go.work.sum new file mode 100644 index 0000000..08345b3 --- /dev/null +++ b/go.work.sum @@ -0,0 +1,4 @@ +golang.org/x/exp v0.0.0-20231108232855-2478ac86f678/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= diff --git a/relay/server.go b/relay/server.go index 085929c..d4a1edd 100644 --- a/relay/server.go +++ b/relay/server.go @@ -98,7 +98,10 @@ func (s *Server) handleWS(w http.ResponseWriter, r *http.Request) { s.mu.Add(1) go func() { defer s.mu.Done() - ctx := r.Context() + // r.Context() is cancelled by the HTTP server when Hijack is called, + // so we use a fresh context. The connection manages its own lifecycle + // via the ping loop and WebSocket close frames. + ctx := context.Background() h.serve(ctx) if err := c.CloseConn(); err != nil { // Ignore close errors — connection may already be gone. diff --git a/relay/websocket/websocket.go b/relay/websocket/websocket.go index cfc3289..2ae0dec 100644 --- a/relay/websocket/websocket.go +++ b/relay/websocket/websocket.go @@ -7,12 +7,14 @@ import ( "context" "crypto/rand" "crypto/sha1" + "crypto/tls" "encoding/base64" "encoding/binary" "fmt" "io" "net" "net/http" + "net/url" "strings" "sync" "time" @@ -207,6 +209,79 @@ func acceptKey(key string) string { return base64.StdEncoding.EncodeToString(h.Sum(nil)) } +// Dial connects to a WebSocket server at rawURL and performs the client-side +// RFC 6455 handshake. Supports ws:// and wss:// schemes. +func Dial(rawURL string) (*Conn, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, fmt.Errorf("websocket: parse url: %w", err) + } + + host := u.Host + var netConn net.Conn + switch u.Scheme { + case "ws": + if !strings.Contains(host, ":") { + host += ":80" + } + netConn, err = net.Dial("tcp", host) + case "wss": + if !strings.Contains(host, ":") { + host += ":443" + } + netConn, err = tls.Dial("tcp", host, &tls.Config{ServerName: u.Hostname()}) + default: + return nil, fmt.Errorf("websocket: unsupported scheme %q (use ws:// or wss://)", u.Scheme) + } + if err != nil { + return nil, fmt.Errorf("websocket: dial %s: %w", host, err) + } + + // Generate a random 16-byte key and base64-encode it. + var keyBytes [16]byte + if _, err := rand.Read(keyBytes[:]); err != nil { + netConn.Close() + return nil, fmt.Errorf("websocket: generate key: %w", err) + } + key := base64.StdEncoding.EncodeToString(keyBytes[:]) + + path := u.RequestURI() + if path == "" { + path = "/" + } + + req := "GET " + path + " HTTP/1.1\r\n" + + "Host: " + u.Host + "\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Key: " + key + "\r\n" + + "Sec-WebSocket-Version: 13\r\n\r\n" + + if _, err := netConn.Write([]byte(req)); err != nil { + netConn.Close() + return nil, fmt.Errorf("websocket: send handshake: %w", err) + } + + br := bufio.NewReader(netConn) + resp, err := http.ReadResponse(br, nil) + if err != nil { + netConn.Close() + return nil, fmt.Errorf("websocket: read handshake response: %w", err) + } + resp.Body.Close() + + if resp.StatusCode != 101 { + netConn.Close() + return nil, fmt.Errorf("websocket: server returned status %d, want 101", resp.StatusCode) + } + if resp.Header.Get("Sec-WebSocket-Accept") != acceptKey(key) { + netConn.Close() + return nil, fmt.Errorf("websocket: bad Sec-WebSocket-Accept header") + } + + return &Conn{rwc: netConn, br: br, client: true}, nil +} + // Accept performs the server-side WebSocket handshake, hijacking the HTTP // connection and returning a Conn ready for framed I/O. func Accept(w http.ResponseWriter, r *http.Request) (*Conn, error) { -- cgit v1.2.3