From 4ee6da5fd54a2e40b537ed4126e907342c06d54d Mon Sep 17 00:00:00 2001 From: bndw Date: Mon, 9 Mar 2026 17:26:53 -0700 Subject: feat: phase 3 CLI + phase 4 JS client library MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLI (cmd/axon): - Add explicit Unsubscribe on req exit (after EOSE and on Ctrl-C) - Add reconnect with exponential backoff (1s→30s) for req --stream JS library (js/axon.js): - Canonical tag encoding and payload construction matching Go byte-for-byte - Ed25519 sign/verify, keypair generation, challenge signing - AxonClient: WebSocket connect with auth handshake, publish, subscribe, unsubscribe, reconnect-ready callback API - encryptDM/decryptDM: X25519 ECDH (Ed25519 key conversion) + ChaCha20-Poly1305 - runVectors: validates all 6 Phase 1 test vectors against Go ground truth --- cmd/axon/main.go | 118 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 42 deletions(-) (limited to 'cmd') diff --git a/cmd/axon/main.go b/cmd/axon/main.go index f7aaf63..9076687 100644 --- a/cmd/axon/main.go +++ b/cmd/axon/main.go @@ -56,10 +56,6 @@ type authPayload struct { Sig []byte `msgpack:"sig"` } -type okPayload struct { - Message string `msgpack:"message"` -} - type errorPayload struct { Code uint16 `msgpack:"code"` Message string `msgpack:"message"` @@ -79,7 +75,7 @@ type eventPayload struct { Event axon.Event `msgpack:"event"` } -type eosePayload struct { +type unsubscribePayload struct { SubID string `msgpack:"sub_id"` } @@ -344,6 +340,55 @@ func cmdPub(args []string) { // ── req ────────────────────────────────────────────────────────────────────── +// streamOnce dials, subscribes, and reads events until EOSE (non-stream mode), +// context cancellation, or a connection error. Sends Unsubscribe before returning. +// Returns nil on clean EOSE-exit or context cancel; returns the error otherwise. +func streamOnce(ctx context.Context, relayURL string, kp axon.KeyPair, subID string, filter axon.Filter, stream bool) error { + conn, err := dial(relayURL, kp) + if err != nil { + return err + } + defer conn.CloseConn() + + if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { + return fmt.Errorf("subscribe: %w", err) + } + + for { + t, raw, err := recv(conn, ctx) + if err != nil { + if ctx.Err() != nil { + _ = send(conn, msgTypeUnsubscribe, unsubscribePayload{SubID: subID}) + return nil + } + return err + } + switch t { + case msgTypeEvent: + var ep eventPayload + if err := msgpack.Unmarshal(raw, &ep); err != nil { + log.Printf("decode event: %v", err) + continue + } + printEvent(&ep.Event) + + case msgTypeEose: + if !stream { + _ = send(conn, msgTypeUnsubscribe, unsubscribePayload{SubID: subID}) + return nil + } + + case msgTypeError: + var ep errorPayload + msgpack.Unmarshal(raw, &ep) + return fmt.Errorf("error %d: %s", ep.Code, ep.Message) + + default: + log.Printf("unexpected message type %d", t) + } + } +} + func cmdReq(args []string) { fs := flag.NewFlagSet("req", flag.ExitOnError) fs.Usage = func() { @@ -387,21 +432,11 @@ func cmdReq(args []string) { filter.Tags = append(filter.Tags, axon.TagFilter{Name: t.Name, Values: t.Values}) } - conn, err := dial(relayURL, kp) - if err != nil { - log.Fatalf("connect: %v", err) - } - defer conn.CloseConn() - subID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 36) - if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { - log.Fatalf("subscribe: %v", err) - } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Cancel on Ctrl-C when streaming. if *stream { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -411,36 +446,35 @@ func cmdReq(args []string) { }() } + if !*stream { + if err := streamOnce(ctx, relayURL, kp, subID, filter, false); err != nil { + log.Fatalf("req: %v", err) + } + return + } + + // Stream mode: reconnect with exponential backoff on disconnect. + const maxBackoff = 30 * time.Second + backoff := time.Second for { - t, raw, err := recv(conn, ctx) + if ctx.Err() != nil { + return + } + err := streamOnce(ctx, relayURL, kp, subID, filter, true) + if ctx.Err() != nil { + return + } if err != nil { - if ctx.Err() != nil { - return // clean cancellation - } - log.Fatalf("recv: %v", err) + log.Printf("disconnected: %v; reconnecting in %s", err, backoff) } - switch t { - case msgTypeEvent: - var ep eventPayload - if err := msgpack.Unmarshal(raw, &ep); err != nil { - log.Printf("decode event: %v", err) - continue - } - printEvent(&ep.Event) - - case msgTypeEose: - if !*stream { - return - } - // Keep looping for live events. - - case msgTypeError: - var ep errorPayload - msgpack.Unmarshal(raw, &ep) - log.Fatalf("error %d: %s", ep.Code, ep.Message) - - default: - log.Printf("unexpected message type %d", t) + select { + case <-ctx.Done(): + return + case <-time.After(backoff): + } + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff } } } -- cgit v1.2.3