From 0bd392e076e36c80a152abe00cbcd0bc9efedd9c Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 12:48:39 -0700 Subject: feat: add axon CLI for pub/sub; fix relay hijack context bug - cmd/axon: new CLI module with keygen, pub, and req subcommands - keygen: generate Ed25519 keypair, print hex seed and pubkey - pub: sign and publish an event; accepts --kind, --content, --tag - req: query/stream events as JSON lines; accepts --kind, --author, --tag, --since, --until, --limit, --stream - key loaded from --key flag or AXON_KEY env var - relay/websocket: add Dial() for client-side WebSocket handshake (ws:// and wss://, RFC 6455 masking via client:true) - relay/server: fix broken-pipe on auth by switching hijacked conn goroutine from r.Context() to context.Background(); r.Context() is cancelled by net/http immediately after Hijack is called --- cmd/axon/cli | Bin 0 -> 7995047 bytes cmd/axon/go.mod | 20 +++ cmd/axon/go.sum | 16 ++ cmd/axon/main.go | 483 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 519 insertions(+) create mode 100755 cmd/axon/cli create mode 100644 cmd/axon/go.mod create mode 100644 cmd/axon/go.sum create mode 100644 cmd/axon/main.go (limited to 'cmd') diff --git a/cmd/axon/cli b/cmd/axon/cli new file mode 100755 index 0000000..93f7706 Binary files /dev/null and b/cmd/axon/cli differ diff --git a/cmd/axon/go.mod b/cmd/axon/go.mod new file mode 100644 index 0000000..2f26fca --- /dev/null +++ b/cmd/axon/go.mod @@ -0,0 +1,20 @@ +module axon/cli + +go 1.25.5 + +require ( + axon v0.0.0 + axon/relay v0.0.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 +) + +require ( + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/sys v0.41.0 // indirect +) + +replace ( + axon => ../../ + axon/relay => ../../relay +) diff --git a/cmd/axon/go.sum b/cmd/axon/go.sum new file mode 100644 index 0000000..7f2743c --- /dev/null +++ b/cmd/axon/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/cmd/axon/main.go b/cmd/axon/main.go new file mode 100644 index 0000000..f7aaf63 --- /dev/null +++ b/cmd/axon/main.go @@ -0,0 +1,483 @@ +// axon is a CLI for publishing and querying events on an Axon relay. +// +// Usage: +// +// axon keygen +// axon pub [flags] +// axon req [flags] +// +// The private key seed is read from --key or the AXON_KEY environment variable. +// All binary values (IDs, pubkeys, signatures) are printed as lowercase hex. +package main + +import ( + "context" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + "axon" + ws "axon/relay/websocket" + + "github.com/vmihailenco/msgpack/v5" +) + +// ── Wire protocol constants (mirrors relay/handler.go) ────────────────────── + +const ( + msgTypeAuth uint16 = 1 + msgTypeSubscribe uint16 = 2 + msgTypeUnsubscribe uint16 = 3 + msgTypePublish uint16 = 4 + + msgTypeChallenge uint16 = 10 + msgTypeEvent uint16 = 11 + msgTypeEose uint16 = 12 + msgTypeOk uint16 = 13 + msgTypeError uint16 = 14 +) + +// ── Payload structs ────────────────────────────────────────────────────────── + +type challengePayload struct { + Nonce []byte `msgpack:"nonce"` +} + +type authPayload struct { + PubKey []byte `msgpack:"pubkey"` + Sig []byte `msgpack:"sig"` +} + +type okPayload struct { + Message string `msgpack:"message"` +} + +type errorPayload struct { + Code uint16 `msgpack:"code"` + Message string `msgpack:"message"` +} + +type publishPayload struct { + Event axon.Event `msgpack:"event"` +} + +type subscribePayload struct { + SubID string `msgpack:"sub_id"` + Filter axon.Filter `msgpack:"filter"` +} + +type eventPayload struct { + SubID string `msgpack:"sub_id"` + Event axon.Event `msgpack:"event"` +} + +type eosePayload struct { + SubID string `msgpack:"sub_id"` +} + +// ── Transport helpers ──────────────────────────────────────────────────────── + +func send(conn *ws.Conn, msgType uint16, payload interface{}) error { + b, err := msgpack.Marshal([]interface{}{msgType, payload}) + if err != nil { + return err + } + return conn.Write(b) +} + +func recv(conn *ws.Conn, ctx context.Context) (uint16, msgpack.RawMessage, error) { + data, err := conn.Read(ctx) + if err != nil { + return 0, nil, err + } + var arr []msgpack.RawMessage + if err := msgpack.Unmarshal(data, &arr); err != nil { + return 0, nil, fmt.Errorf("decode message: %w", err) + } + if len(arr) < 2 { + return 0, nil, fmt.Errorf("message too short (%d elements)", len(arr)) + } + var t uint16 + if err := msgpack.Unmarshal(arr[0], &t); err != nil { + return 0, nil, fmt.Errorf("decode message type: %w", err) + } + return t, arr[1], nil +} + +// dial connects and performs the auth handshake. Returns an authenticated conn. +func dial(relayURL string, kp axon.KeyPair) (*ws.Conn, error) { + conn, err := ws.Dial(relayURL) + if err != nil { + return nil, err + } + + ctx := context.Background() + + // Receive Challenge. + t, raw, err := recv(conn, ctx) + if err != nil { + conn.CloseConn() + return nil, fmt.Errorf("recv challenge: %w", err) + } + if t != msgTypeChallenge { + conn.CloseConn() + return nil, fmt.Errorf("expected challenge (10), got %d", t) + } + var cp challengePayload + if err := msgpack.Unmarshal(raw, &cp); err != nil { + conn.CloseConn() + return nil, fmt.Errorf("decode challenge: %w", err) + } + + // Send Auth. + sig := axon.SignChallenge(kp, cp.Nonce, relayURL) + if err := send(conn, msgTypeAuth, authPayload{ + PubKey: []byte(kp.PubKey), + Sig: sig, + }); err != nil { + conn.CloseConn() + return nil, fmt.Errorf("send auth: %w", err) + } + + // Receive Ok / Error. + t, raw, err = recv(conn, ctx) + if err != nil { + conn.CloseConn() + return nil, fmt.Errorf("recv auth response: %w", err) + } + switch t { + case msgTypeOk: + return conn, nil + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + conn.CloseConn() + return nil, fmt.Errorf("auth rejected (%d): %s", ep.Code, ep.Message) + default: + conn.CloseConn() + return nil, fmt.Errorf("unexpected message %d after auth", t) + } +} + +// ── Output helpers ─────────────────────────────────────────────────────────── + +// eventJSON is a JSON-serialisable view of an axon.Event with hex-encoded +// binary fields and content treated as a UTF-8 string. +type eventJSON struct { + ID string `json:"id"` + PubKey string `json:"pubkey"` + CreatedAt int64 `json:"created_at"` + Kind uint16 `json:"kind"` + Content string `json:"content"` + Tags []axon.Tag `json:"tags"` + Sig string `json:"sig"` +} + +func toEventJSON(e *axon.Event) eventJSON { + return eventJSON{ + ID: hex.EncodeToString(e.ID), + PubKey: hex.EncodeToString(e.PubKey), + CreatedAt: e.CreatedAt, + Kind: e.Kind, + Content: string(e.Content), + Tags: e.Tags, + Sig: hex.EncodeToString(e.Sig), + } +} + +func printEvent(e *axon.Event) { + b, _ := json.Marshal(toEventJSON(e)) + fmt.Println(string(b)) +} + +// ── Custom flag types ──────────────────────────────────────────────────────── + +// tagFlag accumulates --tag name=value or --tag name=v1,v2 flags. +type tagFlag []axon.Tag + +func (f *tagFlag) String() string { return fmt.Sprint([]axon.Tag(*f)) } +func (f *tagFlag) Set(s string) error { + parts := strings.SplitN(s, "=", 2) + if len(parts) != 2 || parts[0] == "" { + return fmt.Errorf("expected name=value, got %q", s) + } + values := strings.Split(parts[1], ",") + *f = append(*f, axon.Tag{Name: parts[0], Values: values}) + return nil +} + +// kindFlag accumulates --kind flags as uint16. +type kindFlag []uint16 + +func (f *kindFlag) String() string { return fmt.Sprint([]uint16(*f)) } +func (f *kindFlag) Set(s string) error { + v, err := strconv.ParseUint(s, 10, 16) + if err != nil { + return err + } + *f = append(*f, uint16(v)) + return nil +} + +// authorFlag accumulates --author hex-pubkey flags. +type authorFlag [][]byte + +func (f *authorFlag) String() string { return fmt.Sprint([][]byte(*f)) } +func (f *authorFlag) Set(s string) error { + b, err := hex.DecodeString(s) + if err != nil { + return fmt.Errorf("not valid hex: %w", err) + } + if len(b) != 32 { + return fmt.Errorf("pubkey must be 32 bytes (64 hex chars), got %d", len(b)) + } + *f = append(*f, b) + return nil +} + +// ── Key loading ────────────────────────────────────────────────────────────── + +func loadKey(hexSeed string) (axon.KeyPair, error) { + if hexSeed == "" { + hexSeed = os.Getenv("AXON_KEY") + } + if hexSeed == "" { + return axon.KeyPair{}, fmt.Errorf("no key: supply --key or set AXON_KEY") + } + seed, err := hex.DecodeString(hexSeed) + if err != nil { + return axon.KeyPair{}, fmt.Errorf("decode key: %w", err) + } + if len(seed) != 32 { + return axon.KeyPair{}, fmt.Errorf("key must be 32 bytes (64 hex chars), got %d", len(seed)) + } + return axon.NewKeyPairFromSeed(seed), nil +} + +// ── keygen ─────────────────────────────────────────────────────────────────── + +func cmdKeygen(_ []string) { + kp, err := axon.NewKeyPair() + if err != nil { + log.Fatalf("keygen: %v", err) + } + fmt.Printf("private-key %s\n", hex.EncodeToString(kp.PrivKey.Seed())) + fmt.Printf("public-key %s\n", hex.EncodeToString(kp.PubKey)) +} + +// ── pub ────────────────────────────────────────────────────────────────────── + +func cmdPub(args []string) { + fs := flag.NewFlagSet("pub", flag.ExitOnError) + fs.Usage = func() { + fmt.Fprintln(os.Stderr, "usage: axon pub [flags] ") + fmt.Fprintln(os.Stderr, "\nFlags:") + fs.PrintDefaults() + } + keyHex := fs.String("key", "", "private key seed (hex, 32 bytes). Falls back to AXON_KEY env var.") + kind := fs.Uint("kind", 1000, "event kind") + content := fs.String("content", "", "event content (string)") + var tags tagFlag + fs.Var(&tags, "tag", "add a tag: name=value or name=v1,v2 (repeatable)") + _ = fs.Parse(args) + + relayURL := fs.Arg(0) + if relayURL == "" { + fs.Usage() + os.Exit(1) + } + + kp, err := loadKey(*keyHex) + if err != nil { + log.Fatal(err) + } + + event := axon.Event{ + CreatedAt: time.Now().Unix(), + Kind: uint16(*kind), + Content: []byte(*content), + Tags: []axon.Tag(tags), + } + if event.Tags == nil { + event.Tags = []axon.Tag{} + } + if err := axon.Sign(&event, kp); err != nil { + log.Fatalf("sign: %v", err) + } + + conn, err := dial(relayURL, kp) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer conn.CloseConn() + + if err := send(conn, msgTypePublish, publishPayload{Event: event}); err != nil { + log.Fatalf("publish: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + t, raw, err := recv(conn, ctx) + if err != nil { + log.Fatalf("recv: %v", err) + } + switch t { + case msgTypeOk: + fmt.Printf("published %s\n", hex.EncodeToString(event.ID)) + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + log.Fatalf("error %d: %s", ep.Code, ep.Message) + default: + log.Fatalf("unexpected message type %d", t) + } +} + +// ── req ────────────────────────────────────────────────────────────────────── + +func cmdReq(args []string) { + fs := flag.NewFlagSet("req", flag.ExitOnError) + fs.Usage = func() { + fmt.Fprintln(os.Stderr, "usage: axon req [flags] ") + fmt.Fprintln(os.Stderr, "\nPrints one JSON event per line. Exits after EOSE unless --stream is set.") + fmt.Fprintln(os.Stderr, "\nFlags:") + fs.PrintDefaults() + } + keyHex := fs.String("key", "", "private key seed (hex). Falls back to AXON_KEY env var.") + var kinds kindFlag + fs.Var(&kinds, "kind", "filter by event kind (repeatable)") + var authors authorFlag + fs.Var(&authors, "author", "filter by author pubkey hex (repeatable)") + var filterTags tagFlag + fs.Var(&filterTags, "tag", "filter by tag: name=value (repeatable)") + since := fs.Int64("since", 0, "only events with created_at >= this unix timestamp") + until := fs.Int64("until", 0, "only events with created_at <= this unix timestamp") + limit := fs.Int("limit", 0, "max events to return (0 = no limit)") + stream := fs.Bool("stream", false, "keep streaming live events after EOSE (Ctrl-C to exit)") + _ = fs.Parse(args) + + relayURL := fs.Arg(0) + if relayURL == "" { + fs.Usage() + os.Exit(1) + } + + kp, err := loadKey(*keyHex) + if err != nil { + log.Fatal(err) + } + + filter := axon.Filter{ + Kinds: []uint16(kinds), + Authors: [][]byte(authors), + Since: *since, + Until: *until, + Limit: int32(*limit), + } + for _, t := range filterTags { + filter.Tags = append(filter.Tags, axon.TagFilter{Name: t.Name, Values: t.Values}) + } + + conn, err := dial(relayURL, kp) + if err != nil { + log.Fatalf("connect: %v", err) + } + defer conn.CloseConn() + + subID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 36) + if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { + log.Fatalf("subscribe: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Cancel on Ctrl-C when streaming. + if *stream { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + cancel() + }() + } + + for { + t, raw, err := recv(conn, ctx) + if err != nil { + if ctx.Err() != nil { + return // clean cancellation + } + log.Fatalf("recv: %v", err) + } + switch t { + case msgTypeEvent: + var ep eventPayload + if err := msgpack.Unmarshal(raw, &ep); err != nil { + log.Printf("decode event: %v", err) + continue + } + printEvent(&ep.Event) + + case msgTypeEose: + if !*stream { + return + } + // Keep looping for live events. + + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + log.Fatalf("error %d: %s", ep.Code, ep.Message) + + default: + log.Printf("unexpected message type %d", t) + } + } +} + +// ── main ───────────────────────────────────────────────────────────────────── + +func usage() { + fmt.Fprintln(os.Stderr, "usage: axon [flags]") + fmt.Fprintln(os.Stderr, "") + fmt.Fprintln(os.Stderr, "Commands:") + fmt.Fprintln(os.Stderr, " keygen Generate a new Ed25519 keypair") + fmt.Fprintln(os.Stderr, " pub Publish an event to a relay") + fmt.Fprintln(os.Stderr, " req Query or stream events from a relay") + fmt.Fprintln(os.Stderr, "") + fmt.Fprintln(os.Stderr, "Run 'axon -h' for command-specific help.") +} + +func main() { + log.SetFlags(0) + log.SetPrefix("axon: ") + + if len(os.Args) < 2 { + usage() + os.Exit(1) + } + + cmd, rest := os.Args[1], os.Args[2:] + switch cmd { + case "keygen": + cmdKeygen(rest) + case "pub": + cmdPub(rest) + case "req": + cmdReq(rest) + default: + fmt.Fprintf(os.Stderr, "axon: unknown command %q\n\n", cmd) + usage() + os.Exit(1) + } +} -- cgit v1.2.3