From e79f9ad89556000521b43ce5ff4eb59dd00768b0 Mon Sep 17 00:00:00 2001 From: bndw Date: Sat, 7 Feb 2026 21:22:51 -0800 Subject: refactor: race-safe Subscribe/Fetch API with channel-based Publish - Add mutex-guarded send/stop on Subscription to prevent send-on-closed-channel panics and data races - Split Subscribe (streams after EOSE) and Fetch (closes on EOSE) per NIP-01 - Rewrite Publish to use channel-based OK dispatch instead of calling Receive directly, which raced with the auto-started Listen goroutine - Clean up all subscriptions when Listen exits so range loops terminate - Update tests and examples for new API --- relay_test.go | 51 ++++++++++++++++++++++----------------------------- 1 file changed, 22 insertions(+), 29 deletions(-) (limited to 'relay_test.go') diff --git a/relay_test.go b/relay_test.go index 4ace956..b39aa06 100644 --- a/relay_test.go +++ b/relay_test.go @@ -76,9 +76,16 @@ func TestRelaySendReceive(t *testing.T) { url := "ws" + strings.TrimPrefix(server.URL, "http") ctx := context.Background() - relay, err := Connect(ctx, url) + // Create relay without auto-Listen to test Send/Receive directly + conn, _, err := websocket.Dial(ctx, url, nil) if err != nil { - t.Fatalf("Connect() error = %v", err) + t.Fatalf("Dial() error = %v", err) + } + relay := &Relay{ + URL: url, + conn: conn, + subscriptions: make(map[string]*Subscription), + okChannels: make(map[string]chan *OKEnvelope), } defer relay.Close() @@ -234,7 +241,8 @@ func TestRelaySubscribe(t *testing.T) { defer server.Close() url := "ws" + strings.TrimPrefix(server.URL, "http") - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() relay, err := Connect(ctx, url) if err != nil { @@ -242,30 +250,18 @@ func TestRelaySubscribe(t *testing.T) { } defer relay.Close() - sub, err := relay.Subscribe(ctx, "sub1", Filter{Kinds: []int{1}}) - if err != nil { - t.Fatalf("Subscribe() error = %v", err) - } + sub := relay.Fetch(ctx, Filter{Kinds: []int{1}}) - // Start listening in background - go relay.Listen(ctx) - - // Collect events eventCount := 0 - timeout := time.After(2 * time.Second) - - for { - select { - case <-sub.Events: - eventCount++ - case <-sub.EOSE: - if eventCount != 3 { - t.Errorf("Received %d events, want 3", eventCount) - } - return - case <-timeout: - t.Fatal("Timeout waiting for events") - } + for range sub.Events { + eventCount++ + } + + if eventCount != 3 { + t.Errorf("Received %d events, want 3", eventCount) + } + if sub.Err != nil { + t.Errorf("Subscription.Err = %v, want nil", sub.Err) } } @@ -322,10 +318,7 @@ func TestSubscriptionClose(t *testing.T) { } defer relay.Close() - sub, err := relay.Subscribe(ctx, "sub1", Filter{Kinds: []int{1}}) - if err != nil { - t.Fatalf("Subscribe() error = %v", err) - } + sub := relay.Subscribe(ctx, Filter{Kinds: []int{1}}) if err := sub.Close(ctx); err != nil { t.Errorf("Subscription.Close() error = %v", err) -- cgit v1.2.3