// axon is a CLI for publishing and querying events on an Axon relay. // // Usage: // // axon keygen // axon pub [flags] // axon req [flags] // // The private key seed is read from --key or the AXON_KEY environment variable. // All binary values (IDs, pubkeys, signatures) are printed as lowercase hex. package main import ( "context" "encoding/hex" "encoding/json" "flag" "fmt" "log" "os" "os/signal" "strconv" "strings" "syscall" "time" "axon" ws "axon/relay/websocket" "github.com/vmihailenco/msgpack/v5" ) // ── Wire protocol constants (mirrors relay/handler.go) ────────────────────── const ( msgTypeAuth uint16 = 1 msgTypeSubscribe uint16 = 2 msgTypeUnsubscribe uint16 = 3 msgTypePublish uint16 = 4 msgTypeChallenge uint16 = 10 msgTypeEvent uint16 = 11 msgTypeEose uint16 = 12 msgTypeOk uint16 = 13 msgTypeError uint16 = 14 ) // ── Payload structs ────────────────────────────────────────────────────────── type challengePayload struct { Nonce []byte `msgpack:"nonce"` } type authPayload struct { PubKey []byte `msgpack:"pubkey"` Sig []byte `msgpack:"sig"` } type errorPayload struct { Code uint16 `msgpack:"code"` Message string `msgpack:"message"` } type publishPayload struct { Event axon.Event `msgpack:"event"` } type subscribePayload struct { SubID string `msgpack:"sub_id"` Filter axon.Filter `msgpack:"filter"` } type eventPayload struct { SubID string `msgpack:"sub_id"` Event axon.Event `msgpack:"event"` } type unsubscribePayload struct { SubID string `msgpack:"sub_id"` } // ── Transport helpers ──────────────────────────────────────────────────────── func send(conn *ws.Conn, msgType uint16, payload interface{}) error { b, err := msgpack.Marshal([]interface{}{msgType, payload}) if err != nil { return err } return conn.Write(b) } func recv(conn *ws.Conn, ctx context.Context) (uint16, msgpack.RawMessage, error) { data, err := conn.Read(ctx) if err != nil { return 0, nil, err } var arr []msgpack.RawMessage if err := msgpack.Unmarshal(data, &arr); err != nil { return 0, nil, fmt.Errorf("decode message: %w", err) } if len(arr) < 2 { return 0, nil, fmt.Errorf("message too short (%d elements)", len(arr)) } var t uint16 if err := msgpack.Unmarshal(arr[0], &t); err != nil { return 0, nil, fmt.Errorf("decode message type: %w", err) } return t, arr[1], nil } // dial connects and performs the auth handshake. Returns an authenticated conn. func dial(relayURL string, kp axon.KeyPair) (*ws.Conn, error) { conn, err := ws.Dial(relayURL) if err != nil { return nil, err } ctx := context.Background() // Receive Challenge. t, raw, err := recv(conn, ctx) if err != nil { conn.CloseConn() return nil, fmt.Errorf("recv challenge: %w", err) } if t != msgTypeChallenge { conn.CloseConn() return nil, fmt.Errorf("expected challenge (10), got %d", t) } var cp challengePayload if err := msgpack.Unmarshal(raw, &cp); err != nil { conn.CloseConn() return nil, fmt.Errorf("decode challenge: %w", err) } // Send Auth. sig := axon.SignChallenge(kp, cp.Nonce, relayURL) if err := send(conn, msgTypeAuth, authPayload{ PubKey: []byte(kp.PubKey), Sig: sig, }); err != nil { conn.CloseConn() return nil, fmt.Errorf("send auth: %w", err) } // Receive Ok / Error. t, raw, err = recv(conn, ctx) if err != nil { conn.CloseConn() return nil, fmt.Errorf("recv auth response: %w", err) } switch t { case msgTypeOk: return conn, nil case msgTypeError: var ep errorPayload msgpack.Unmarshal(raw, &ep) conn.CloseConn() return nil, fmt.Errorf("auth rejected (%d): %s", ep.Code, ep.Message) default: conn.CloseConn() return nil, fmt.Errorf("unexpected message %d after auth", t) } } // ── Output helpers ─────────────────────────────────────────────────────────── // eventJSON is a JSON-serialisable view of an axon.Event with hex-encoded // binary fields and content treated as a UTF-8 string. type eventJSON struct { ID string `json:"id"` PubKey string `json:"pubkey"` CreatedAt int64 `json:"created_at"` Kind uint16 `json:"kind"` Content string `json:"content"` Tags []axon.Tag `json:"tags"` Sig string `json:"sig"` } func toEventJSON(e *axon.Event) eventJSON { return eventJSON{ ID: hex.EncodeToString(e.ID), PubKey: hex.EncodeToString(e.PubKey), CreatedAt: e.CreatedAt, Kind: e.Kind, Content: string(e.Content), Tags: e.Tags, Sig: hex.EncodeToString(e.Sig), } } func printEvent(e *axon.Event) { b, _ := json.Marshal(toEventJSON(e)) fmt.Println(string(b)) } // ── Custom flag types ──────────────────────────────────────────────────────── // tagFlag accumulates --tag name=value or --tag name=v1,v2 flags. type tagFlag []axon.Tag func (f *tagFlag) String() string { return fmt.Sprint([]axon.Tag(*f)) } func (f *tagFlag) Set(s string) error { parts := strings.SplitN(s, "=", 2) if len(parts) != 2 || parts[0] == "" { return fmt.Errorf("expected name=value, got %q", s) } values := strings.Split(parts[1], ",") *f = append(*f, axon.Tag{Name: parts[0], Values: values}) return nil } // kindFlag accumulates --kind flags as uint16. type kindFlag []uint16 func (f *kindFlag) String() string { return fmt.Sprint([]uint16(*f)) } func (f *kindFlag) Set(s string) error { v, err := strconv.ParseUint(s, 10, 16) if err != nil { return err } *f = append(*f, uint16(v)) return nil } // authorFlag accumulates --author hex-pubkey flags. type authorFlag [][]byte func (f *authorFlag) String() string { return fmt.Sprint([][]byte(*f)) } func (f *authorFlag) Set(s string) error { b, err := hex.DecodeString(s) if err != nil { return fmt.Errorf("not valid hex: %w", err) } if len(b) != 32 { return fmt.Errorf("pubkey must be 32 bytes (64 hex chars), got %d", len(b)) } *f = append(*f, b) return nil } // ── Key loading ────────────────────────────────────────────────────────────── func loadKey(hexSeed string) (axon.KeyPair, error) { if hexSeed == "" { hexSeed = os.Getenv("AXON_KEY") } if hexSeed == "" { return axon.KeyPair{}, fmt.Errorf("no key: supply --key or set AXON_KEY") } seed, err := hex.DecodeString(hexSeed) if err != nil { return axon.KeyPair{}, fmt.Errorf("decode key: %w", err) } if len(seed) != 32 { return axon.KeyPair{}, fmt.Errorf("key must be 32 bytes (64 hex chars), got %d", len(seed)) } return axon.NewKeyPairFromSeed(seed), nil } // ── keygen ─────────────────────────────────────────────────────────────────── func cmdKeygen(_ []string) { kp, err := axon.NewKeyPair() if err != nil { log.Fatalf("keygen: %v", err) } fmt.Printf("private-key %s\n", hex.EncodeToString(kp.PrivKey.Seed())) fmt.Printf("public-key %s\n", hex.EncodeToString(kp.PubKey)) } // ── pub ────────────────────────────────────────────────────────────────────── func cmdPub(args []string) { fs := flag.NewFlagSet("pub", flag.ExitOnError) fs.Usage = func() { fmt.Fprintln(os.Stderr, "usage: axon pub [flags] ") fmt.Fprintln(os.Stderr, "\nFlags:") fs.PrintDefaults() } keyHex := fs.String("key", "", "private key seed (hex, 32 bytes). Falls back to AXON_KEY env var.") kind := fs.Uint("kind", 1000, "event kind") content := fs.String("content", "", "event content (string)") var tags tagFlag fs.Var(&tags, "tag", "add a tag: name=value or name=v1,v2 (repeatable)") _ = fs.Parse(args) relayURL := fs.Arg(0) if relayURL == "" { fs.Usage() os.Exit(1) } kp, err := loadKey(*keyHex) if err != nil { log.Fatal(err) } event := axon.Event{ CreatedAt: time.Now().Unix(), Kind: uint16(*kind), Content: []byte(*content), Tags: []axon.Tag(tags), } if event.Tags == nil { event.Tags = []axon.Tag{} } if err := axon.Sign(&event, kp); err != nil { log.Fatalf("sign: %v", err) } conn, err := dial(relayURL, kp) if err != nil { log.Fatalf("connect: %v", err) } defer conn.CloseConn() if err := send(conn, msgTypePublish, publishPayload{Event: event}); err != nil { log.Fatalf("publish: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() t, raw, err := recv(conn, ctx) if err != nil { log.Fatalf("recv: %v", err) } switch t { case msgTypeOk: fmt.Printf("published %s\n", hex.EncodeToString(event.ID)) case msgTypeError: var ep errorPayload msgpack.Unmarshal(raw, &ep) log.Fatalf("error %d: %s", ep.Code, ep.Message) default: log.Fatalf("unexpected message type %d", t) } } // ── req ────────────────────────────────────────────────────────────────────── // streamOnce dials, subscribes, and reads events until EOSE (non-stream mode), // context cancellation, or a connection error. Sends Unsubscribe before returning. // Returns nil on clean EOSE-exit or context cancel; returns the error otherwise. func streamOnce(ctx context.Context, relayURL string, kp axon.KeyPair, subID string, filter axon.Filter, stream bool) error { conn, err := dial(relayURL, kp) if err != nil { return err } defer conn.CloseConn() if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { return fmt.Errorf("subscribe: %w", err) } for { t, raw, err := recv(conn, ctx) if err != nil { if ctx.Err() != nil { _ = send(conn, msgTypeUnsubscribe, unsubscribePayload{SubID: subID}) return nil } return err } switch t { case msgTypeEvent: var ep eventPayload if err := msgpack.Unmarshal(raw, &ep); err != nil { log.Printf("decode event: %v", err) continue } printEvent(&ep.Event) case msgTypeEose: if !stream { _ = send(conn, msgTypeUnsubscribe, unsubscribePayload{SubID: subID}) return nil } case msgTypeError: var ep errorPayload msgpack.Unmarshal(raw, &ep) return fmt.Errorf("error %d: %s", ep.Code, ep.Message) default: log.Printf("unexpected message type %d", t) } } } func cmdReq(args []string) { fs := flag.NewFlagSet("req", flag.ExitOnError) fs.Usage = func() { fmt.Fprintln(os.Stderr, "usage: axon req [flags] ") fmt.Fprintln(os.Stderr, "\nPrints one JSON event per line. Exits after EOSE unless --stream is set.") fmt.Fprintln(os.Stderr, "\nFlags:") fs.PrintDefaults() } keyHex := fs.String("key", "", "private key seed (hex). Falls back to AXON_KEY env var.") var kinds kindFlag fs.Var(&kinds, "kind", "filter by event kind (repeatable)") var authors authorFlag fs.Var(&authors, "author", "filter by author pubkey hex (repeatable)") var filterTags tagFlag fs.Var(&filterTags, "tag", "filter by tag: name=value (repeatable)") since := fs.Int64("since", 0, "only events with created_at >= this unix timestamp") until := fs.Int64("until", 0, "only events with created_at <= this unix timestamp") limit := fs.Int("limit", 0, "max events to return (0 = no limit)") stream := fs.Bool("stream", false, "keep streaming live events after EOSE (Ctrl-C to exit)") _ = fs.Parse(args) relayURL := fs.Arg(0) if relayURL == "" { fs.Usage() os.Exit(1) } kp, err := loadKey(*keyHex) if err != nil { log.Fatal(err) } filter := axon.Filter{ Kinds: []uint16(kinds), Authors: [][]byte(authors), Since: *since, Until: *until, Limit: int32(*limit), } for _, t := range filterTags { filter.Tags = append(filter.Tags, axon.TagFilter{Name: t.Name, Values: t.Values}) } subID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 36) ctx, cancel := context.WithCancel(context.Background()) defer cancel() if *stream { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigCh cancel() }() } if !*stream { if err := streamOnce(ctx, relayURL, kp, subID, filter, false); err != nil { log.Fatalf("req: %v", err) } return } // Stream mode: reconnect with exponential backoff on disconnect. const maxBackoff = 30 * time.Second backoff := time.Second for { if ctx.Err() != nil { return } err := streamOnce(ctx, relayURL, kp, subID, filter, true) if ctx.Err() != nil { return } if err != nil { log.Printf("disconnected: %v; reconnecting in %s", err, backoff) } select { case <-ctx.Done(): return case <-time.After(backoff): } backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } } } // ── main ───────────────────────────────────────────────────────────────────── func usage() { fmt.Fprintln(os.Stderr, "usage: axon [flags]") fmt.Fprintln(os.Stderr, "") fmt.Fprintln(os.Stderr, "Commands:") fmt.Fprintln(os.Stderr, " keygen Generate a new Ed25519 keypair") fmt.Fprintln(os.Stderr, " pub Publish an event to a relay") fmt.Fprintln(os.Stderr, " req Query or stream events from a relay") fmt.Fprintln(os.Stderr, "") fmt.Fprintln(os.Stderr, "Run 'axon -h' for command-specific help.") } func main() { log.SetFlags(0) log.SetPrefix("axon: ") if len(os.Args) < 2 { usage() os.Exit(1) } cmd, rest := os.Args[1], os.Args[2:] switch cmd { case "keygen": cmdKeygen(rest) case "pub": cmdPub(rest) case "req": cmdReq(rest) default: fmt.Fprintf(os.Stderr, "axon: unknown command %q\n\n", cmd) usage() os.Exit(1) } }