diff options
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/axon/main.go | 118 |
1 files changed, 76 insertions, 42 deletions
diff --git a/cmd/axon/main.go b/cmd/axon/main.go index f7aaf63..9076687 100644 --- a/cmd/axon/main.go +++ b/cmd/axon/main.go | |||
| @@ -56,10 +56,6 @@ type authPayload struct { | |||
| 56 | Sig []byte `msgpack:"sig"` | 56 | Sig []byte `msgpack:"sig"` |
| 57 | } | 57 | } |
| 58 | 58 | ||
| 59 | type okPayload struct { | ||
| 60 | Message string `msgpack:"message"` | ||
| 61 | } | ||
| 62 | |||
| 63 | type errorPayload struct { | 59 | type errorPayload struct { |
| 64 | Code uint16 `msgpack:"code"` | 60 | Code uint16 `msgpack:"code"` |
| 65 | Message string `msgpack:"message"` | 61 | Message string `msgpack:"message"` |
| @@ -79,7 +75,7 @@ type eventPayload struct { | |||
| 79 | Event axon.Event `msgpack:"event"` | 75 | Event axon.Event `msgpack:"event"` |
| 80 | } | 76 | } |
| 81 | 77 | ||
| 82 | type eosePayload struct { | 78 | type unsubscribePayload struct { |
| 83 | SubID string `msgpack:"sub_id"` | 79 | SubID string `msgpack:"sub_id"` |
| 84 | } | 80 | } |
| 85 | 81 | ||
| @@ -344,6 +340,55 @@ func cmdPub(args []string) { | |||
| 344 | 340 | ||
| 345 | // ── req ────────────────────────────────────────────────────────────────────── | 341 | // ── req ────────────────────────────────────────────────────────────────────── |
| 346 | 342 | ||
| 343 | // streamOnce dials, subscribes, and reads events until EOSE (non-stream mode), | ||
| 344 | // context cancellation, or a connection error. Sends Unsubscribe before returning. | ||
| 345 | // Returns nil on clean EOSE-exit or context cancel; returns the error otherwise. | ||
| 346 | func streamOnce(ctx context.Context, relayURL string, kp axon.KeyPair, subID string, filter axon.Filter, stream bool) error { | ||
| 347 | conn, err := dial(relayURL, kp) | ||
| 348 | if err != nil { | ||
| 349 | return err | ||
| 350 | } | ||
| 351 | defer conn.CloseConn() | ||
| 352 | |||
| 353 | if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { | ||
| 354 | return fmt.Errorf("subscribe: %w", err) | ||
| 355 | } | ||
| 356 | |||
| 357 | for { | ||
| 358 | t, raw, err := recv(conn, ctx) | ||
| 359 | if err != nil { | ||
| 360 | if ctx.Err() != nil { | ||
| 361 | _ = send(conn, msgTypeUnsubscribe, unsubscribePayload{SubID: subID}) | ||
| 362 | return nil | ||
| 363 | } | ||
| 364 | return err | ||
| 365 | } | ||
| 366 | switch t { | ||
| 367 | case msgTypeEvent: | ||
| 368 | var ep eventPayload | ||
| 369 | if err := msgpack.Unmarshal(raw, &ep); err != nil { | ||
| 370 | log.Printf("decode event: %v", err) | ||
| 371 | continue | ||
| 372 | } | ||
| 373 | printEvent(&ep.Event) | ||
| 374 | |||
| 375 | case msgTypeEose: | ||
| 376 | if !stream { | ||
| 377 | _ = send(conn, msgTypeUnsubscribe, unsubscribePayload{SubID: subID}) | ||
| 378 | return nil | ||
| 379 | } | ||
| 380 | |||
| 381 | case msgTypeError: | ||
| 382 | var ep errorPayload | ||
| 383 | msgpack.Unmarshal(raw, &ep) | ||
| 384 | return fmt.Errorf("error %d: %s", ep.Code, ep.Message) | ||
| 385 | |||
| 386 | default: | ||
| 387 | log.Printf("unexpected message type %d", t) | ||
| 388 | } | ||
| 389 | } | ||
| 390 | } | ||
| 391 | |||
| 347 | func cmdReq(args []string) { | 392 | func cmdReq(args []string) { |
| 348 | fs := flag.NewFlagSet("req", flag.ExitOnError) | 393 | fs := flag.NewFlagSet("req", flag.ExitOnError) |
| 349 | fs.Usage = func() { | 394 | fs.Usage = func() { |
| @@ -387,21 +432,11 @@ func cmdReq(args []string) { | |||
| 387 | filter.Tags = append(filter.Tags, axon.TagFilter{Name: t.Name, Values: t.Values}) | 432 | filter.Tags = append(filter.Tags, axon.TagFilter{Name: t.Name, Values: t.Values}) |
| 388 | } | 433 | } |
| 389 | 434 | ||
| 390 | conn, err := dial(relayURL, kp) | ||
| 391 | if err != nil { | ||
| 392 | log.Fatalf("connect: %v", err) | ||
| 393 | } | ||
| 394 | defer conn.CloseConn() | ||
| 395 | |||
| 396 | subID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 36) | 435 | subID := "req-" + strconv.FormatInt(time.Now().UnixNano(), 36) |
| 397 | if err := send(conn, msgTypeSubscribe, subscribePayload{SubID: subID, Filter: filter}); err != nil { | ||
| 398 | log.Fatalf("subscribe: %v", err) | ||
| 399 | } | ||
| 400 | 436 | ||
| 401 | ctx, cancel := context.WithCancel(context.Background()) | 437 | ctx, cancel := context.WithCancel(context.Background()) |
| 402 | defer cancel() | 438 | defer cancel() |
| 403 | 439 | ||
| 404 | // Cancel on Ctrl-C when streaming. | ||
| 405 | if *stream { | 440 | if *stream { |
| 406 | sigCh := make(chan os.Signal, 1) | 441 | sigCh := make(chan os.Signal, 1) |
| 407 | signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) | 442 | signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) |
| @@ -411,36 +446,35 @@ func cmdReq(args []string) { | |||
| 411 | }() | 446 | }() |
| 412 | } | 447 | } |
| 413 | 448 | ||
| 449 | if !*stream { | ||
| 450 | if err := streamOnce(ctx, relayURL, kp, subID, filter, false); err != nil { | ||
| 451 | log.Fatalf("req: %v", err) | ||
| 452 | } | ||
| 453 | return | ||
| 454 | } | ||
| 455 | |||
| 456 | // Stream mode: reconnect with exponential backoff on disconnect. | ||
| 457 | const maxBackoff = 30 * time.Second | ||
| 458 | backoff := time.Second | ||
| 414 | for { | 459 | for { |
| 415 | t, raw, err := recv(conn, ctx) | 460 | if ctx.Err() != nil { |
| 461 | return | ||
| 462 | } | ||
| 463 | err := streamOnce(ctx, relayURL, kp, subID, filter, true) | ||
| 464 | if ctx.Err() != nil { | ||
| 465 | return | ||
| 466 | } | ||
| 416 | if err != nil { | 467 | if err != nil { |
| 417 | if ctx.Err() != nil { | 468 | log.Printf("disconnected: %v; reconnecting in %s", err, backoff) |
| 418 | return // clean cancellation | ||
| 419 | } | ||
| 420 | log.Fatalf("recv: %v", err) | ||
| 421 | } | 469 | } |
| 422 | switch t { | 470 | select { |
| 423 | case msgTypeEvent: | 471 | case <-ctx.Done(): |
| 424 | var ep eventPayload | 472 | return |
| 425 | if err := msgpack.Unmarshal(raw, &ep); err != nil { | 473 | case <-time.After(backoff): |
| 426 | log.Printf("decode event: %v", err) | 474 | } |
| 427 | continue | 475 | backoff *= 2 |
| 428 | } | 476 | if backoff > maxBackoff { |
| 429 | printEvent(&ep.Event) | 477 | backoff = maxBackoff |
| 430 | |||
| 431 | case msgTypeEose: | ||
| 432 | if !*stream { | ||
| 433 | return | ||
| 434 | } | ||
| 435 | // Keep looping for live events. | ||
| 436 | |||
| 437 | case msgTypeError: | ||
| 438 | var ep errorPayload | ||
| 439 | msgpack.Unmarshal(raw, &ep) | ||
| 440 | log.Fatalf("error %d: %s", ep.Code, ep.Message) | ||
| 441 | |||
| 442 | default: | ||
| 443 | log.Printf("unexpected message type %d", t) | ||
| 444 | } | 478 | } |
| 445 | } | 479 | } |
| 446 | } | 480 | } |
