diff options
| author | bndw <ben@bdw.to> | 2026-02-07 21:22:51 -0800 |
|---|---|---|
| committer | bndw <ben@bdw.to> | 2026-02-07 21:22:51 -0800 |
| commit | e79f9ad89556000521b43ce5ff4eb59dd00768b0 (patch) | |
| tree | 9f8750e3c5431faf43fa672cb78a733a85d9dcfe | |
| parent | d4fd2467d691a69a0ba75348086424b9fb33a297 (diff) | |
refactor: race-safe Subscribe/Fetch API with channel-based Publish
- Add mutex-guarded send/stop on Subscription to prevent send-on-closed-channel
panics and data races
- Split Subscribe (streams after EOSE) and Fetch (closes on EOSE) per NIP-01
- Rewrite Publish to use channel-based OK dispatch instead of calling Receive
directly, which raced with the auto-started Listen goroutine
- Clean up all subscriptions when Listen exits so range loops terminate
- Update tests and examples for new API
| -rw-r--r-- | example_test.go | 35 | ||||
| -rw-r--r-- | examples/basic/main.go | 39 | ||||
| -rw-r--r-- | relay.go | 186 | ||||
| -rw-r--r-- | relay_test.go | 51 |
4 files changed, 176 insertions, 135 deletions
diff --git a/example_test.go b/example_test.go index 90dae0f..6d10ced 100644 --- a/example_test.go +++ b/example_test.go | |||
| @@ -53,8 +53,7 @@ func Example_basic() { | |||
| 53 | // ExampleRelay demonstrates connecting to a relay (requires network). | 53 | // ExampleRelay demonstrates connecting to a relay (requires network). |
| 54 | // This is a documentation example - run with: go test -v -run ExampleRelay | 54 | // This is a documentation example - run with: go test -v -run ExampleRelay |
| 55 | func ExampleRelay() { | 55 | func ExampleRelay() { |
| 56 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | 56 | ctx := context.Background() |
| 57 | defer cancel() | ||
| 58 | 57 | ||
| 59 | // Connect to a public relay | 58 | // Connect to a public relay |
| 60 | relay, err := nostr.Connect(ctx, "wss://relay.damus.io") | 59 | relay, err := nostr.Connect(ctx, "wss://relay.damus.io") |
| @@ -66,35 +65,21 @@ func ExampleRelay() { | |||
| 66 | 65 | ||
| 67 | fmt.Println("Connected to relay!") | 66 | fmt.Println("Connected to relay!") |
| 68 | 67 | ||
| 69 | // Subscribe to recent text notes | 68 | ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |
| 69 | defer cancel() | ||
| 70 | |||
| 71 | // Fetch recent text notes (closes on EOSE) | ||
| 70 | since := time.Now().Add(-1 * time.Hour).Unix() | 72 | since := time.Now().Add(-1 * time.Hour).Unix() |
| 71 | sub, err := relay.Subscribe(ctx, "my-sub", nostr.Filter{ | 73 | sub := relay.Fetch(ctx, nostr.Filter{ |
| 72 | Kinds: []int{nostr.KindTextNote}, | 74 | Kinds: []int{nostr.KindTextNote}, |
| 73 | Since: &since, | 75 | Since: &since, |
| 74 | Limit: 5, | 76 | Limit: 5, |
| 75 | }) | 77 | }) |
| 76 | if err != nil { | ||
| 77 | fmt.Printf("Failed to subscribe: %v\n", err) | ||
| 78 | return | ||
| 79 | } | ||
| 80 | |||
| 81 | // Listen for events in the background | ||
| 82 | go relay.Listen(ctx) | ||
| 83 | 78 | ||
| 84 | // Collect events until EOSE | ||
| 85 | eventCount := 0 | 79 | eventCount := 0 |
| 86 | for { | 80 | for event := range sub.Events { |
| 87 | select { | 81 | eventCount++ |
| 88 | case event := <-sub.Events: | 82 | fmt.Printf("Received event from %s...\n", event.PubKey[:8]) |
| 89 | eventCount++ | ||
| 90 | fmt.Printf("Received event from %s...\n", event.PubKey[:8]) | ||
| 91 | case <-sub.EOSE: | ||
| 92 | fmt.Printf("Received %d events before EOSE\n", eventCount) | ||
| 93 | sub.Close(ctx) | ||
| 94 | return | ||
| 95 | case <-ctx.Done(): | ||
| 96 | fmt.Println("Timeout") | ||
| 97 | return | ||
| 98 | } | ||
| 99 | } | 83 | } |
| 84 | fmt.Printf("Received %d events\n", eventCount) | ||
| 100 | } | 85 | } |
diff --git a/examples/basic/main.go b/examples/basic/main.go index 0c99dd9..1a4061a 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go | |||
| @@ -53,11 +53,8 @@ func main() { | |||
| 53 | ExampleRelay() | 53 | ExampleRelay() |
| 54 | } | 54 | } |
| 55 | 55 | ||
| 56 | // ExampleRelay demonstrates connecting to a relay (requires network). | ||
| 57 | // This is a documentation example - run with: go test -v -run ExampleRelay | ||
| 58 | func ExampleRelay() { | 56 | func ExampleRelay() { |
| 59 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | 57 | ctx := context.Background() |
| 60 | defer cancel() | ||
| 61 | 58 | ||
| 62 | // Connect to a public relay | 59 | // Connect to a public relay |
| 63 | relay, err := nostr.Connect(ctx, "wss://relay.damus.io") | 60 | relay, err := nostr.Connect(ctx, "wss://relay.damus.io") |
| @@ -66,38 +63,16 @@ func ExampleRelay() { | |||
| 66 | return | 63 | return |
| 67 | } | 64 | } |
| 68 | defer relay.Close() | 65 | defer relay.Close() |
| 69 | |||
| 70 | fmt.Println("Connected to relay!") | 66 | fmt.Println("Connected to relay!") |
| 71 | 67 | ||
| 72 | // Subscribe to recent text notes | 68 | ctx, cancel := context.WithTimeout(ctx, 25*time.Second) |
| 73 | since := time.Now().Add(-1 * time.Hour).Unix() | 69 | defer cancel() |
| 74 | sub, err := relay.Subscribe(ctx, "my-sub", nostr.Filter{ | 70 | |
| 71 | filter := nostr.Filter{ | ||
| 75 | Kinds: []int{nostr.KindTextNote}, | 72 | Kinds: []int{nostr.KindTextNote}, |
| 76 | Since: &since, | ||
| 77 | Limit: 5, | 73 | Limit: 5, |
| 78 | }) | ||
| 79 | if err != nil { | ||
| 80 | fmt.Printf("Failed to subscribe: %v\n", err) | ||
| 81 | os.Exit(1) | ||
| 82 | } | 74 | } |
| 83 | 75 | for event := range relay.Fetch(ctx, filter).Events { | |
| 84 | // Listen for events in the background | 76 | fmt.Printf("Received event from %s...\n", event) |
| 85 | go relay.Listen(ctx) | ||
| 86 | |||
| 87 | // Collect events until EOSE | ||
| 88 | eventCount := 0 | ||
| 89 | for { | ||
| 90 | select { | ||
| 91 | case event := <-sub.Events: | ||
| 92 | eventCount++ | ||
| 93 | fmt.Printf("Received event from %s...\n", event) | ||
| 94 | case <-sub.EOSE: | ||
| 95 | fmt.Printf("Received %d events before EOSE\n", eventCount) | ||
| 96 | sub.Close(ctx) | ||
| 97 | return | ||
| 98 | case <-ctx.Done(): | ||
| 99 | fmt.Println("Timeout") | ||
| 100 | return | ||
| 101 | } | ||
| 102 | } | 77 | } |
| 103 | } | 78 | } |
| @@ -2,6 +2,7 @@ package nostr | |||
| 2 | 2 | ||
| 3 | import ( | 3 | import ( |
| 4 | "context" | 4 | "context" |
| 5 | "crypto/rand" | ||
| 5 | "fmt" | 6 | "fmt" |
| 6 | "sync" | 7 | "sync" |
| 7 | 8 | ||
| @@ -16,6 +17,9 @@ type Relay struct { | |||
| 16 | 17 | ||
| 17 | subscriptions map[string]*Subscription | 18 | subscriptions map[string]*Subscription |
| 18 | subscriptionsMu sync.RWMutex | 19 | subscriptionsMu sync.RWMutex |
| 20 | |||
| 21 | okChannels map[string]chan *OKEnvelope | ||
| 22 | okChannelsMu sync.Mutex | ||
| 19 | } | 23 | } |
| 20 | 24 | ||
| 21 | // Connect establishes a WebSocket connection to the relay. | 25 | // Connect establishes a WebSocket connection to the relay. |
| @@ -25,11 +29,16 @@ func Connect(ctx context.Context, url string) (*Relay, error) { | |||
| 25 | return nil, fmt.Errorf("failed to connect to relay: %w", err) | 29 | return nil, fmt.Errorf("failed to connect to relay: %w", err) |
| 26 | } | 30 | } |
| 27 | 31 | ||
| 28 | return &Relay{ | 32 | r := &Relay{ |
| 29 | URL: url, | 33 | URL: url, |
| 30 | conn: conn, | 34 | conn: conn, |
| 31 | subscriptions: make(map[string]*Subscription), | 35 | subscriptions: make(map[string]*Subscription), |
| 32 | }, nil | 36 | okChannels: make(map[string]chan *OKEnvelope), |
| 37 | } | ||
| 38 | |||
| 39 | go r.Listen(ctx) | ||
| 40 | |||
| 41 | return r, nil | ||
| 33 | } | 42 | } |
| 34 | 43 | ||
| 35 | // Close closes the WebSocket connection. | 44 | // Close closes the WebSocket connection. |
| @@ -83,47 +92,64 @@ func (r *Relay) Receive(ctx context.Context) (Envelope, error) { | |||
| 83 | 92 | ||
| 84 | // Publish sends an event to the relay and waits for the OK response. | 93 | // Publish sends an event to the relay and waits for the OK response. |
| 85 | func (r *Relay) Publish(ctx context.Context, event *Event) error { | 94 | func (r *Relay) Publish(ctx context.Context, event *Event) error { |
| 95 | ch := make(chan *OKEnvelope, 1) | ||
| 96 | |||
| 97 | r.okChannelsMu.Lock() | ||
| 98 | r.okChannels[event.ID] = ch | ||
| 99 | r.okChannelsMu.Unlock() | ||
| 100 | |||
| 101 | defer func() { | ||
| 102 | r.okChannelsMu.Lock() | ||
| 103 | delete(r.okChannels, event.ID) | ||
| 104 | r.okChannelsMu.Unlock() | ||
| 105 | }() | ||
| 106 | |||
| 86 | env := EventEnvelope{Event: event} | 107 | env := EventEnvelope{Event: event} |
| 87 | if err := r.Send(ctx, env); err != nil { | 108 | if err := r.Send(ctx, env); err != nil { |
| 88 | return fmt.Errorf("failed to send event: %w", err) | 109 | return fmt.Errorf("failed to send event: %w", err) |
| 89 | } | 110 | } |
| 90 | 111 | ||
| 91 | // Wait for OK response | 112 | select { |
| 92 | for { | 113 | case ok := <-ch: |
| 93 | resp, err := r.Receive(ctx) | 114 | if !ok.OK { |
| 94 | if err != nil { | 115 | return fmt.Errorf("event rejected: %s", ok.Message) |
| 95 | return fmt.Errorf("failed to receive response: %w", err) | ||
| 96 | } | ||
| 97 | |||
| 98 | if ok, isOK := resp.(*OKEnvelope); isOK { | ||
| 99 | if ok.EventID == event.ID { | ||
| 100 | if !ok.OK { | ||
| 101 | return fmt.Errorf("event rejected: %s", ok.Message) | ||
| 102 | } | ||
| 103 | return nil | ||
| 104 | } | ||
| 105 | } | 116 | } |
| 106 | 117 | return nil | |
| 107 | // Dispatch other messages to subscriptions | 118 | case <-ctx.Done(): |
| 108 | r.dispatchEnvelope(resp) | 119 | return ctx.Err() |
| 109 | } | 120 | } |
| 110 | } | 121 | } |
| 111 | 122 | ||
| 112 | // Subscribe creates a subscription with the given filters. | 123 | func genID() string { |
| 113 | func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*Subscription, error) { | 124 | buf := make([]byte, 5) |
| 125 | rand.Read(buf) | ||
| 126 | return fmt.Sprintf("%x", buf) | ||
| 127 | } | ||
| 128 | |||
| 129 | // subscribe is the internal implementation for Subscribe and Fetch. | ||
| 130 | func (r *Relay) subscribe(ctx context.Context, closeOnEOSE bool, filters ...Filter) *Subscription { | ||
| 131 | id := genID() | ||
| 132 | |||
| 114 | sub := &Subscription{ | 133 | sub := &Subscription{ |
| 115 | ID: id, | 134 | ID: id, |
| 116 | relay: r, | 135 | relay: r, |
| 117 | Filters: filters, | 136 | Filters: filters, |
| 118 | Events: make(chan *Event, 100), | 137 | Events: make(chan *Event, 100), |
| 119 | EOSE: make(chan struct{}, 1), | 138 | closeOnEOSE: closeOnEOSE, |
| 120 | closed: make(chan struct{}), | ||
| 121 | } | 139 | } |
| 122 | 140 | ||
| 123 | r.subscriptionsMu.Lock() | 141 | r.subscriptionsMu.Lock() |
| 124 | r.subscriptions[id] = sub | 142 | r.subscriptions[id] = sub |
| 125 | r.subscriptionsMu.Unlock() | 143 | r.subscriptionsMu.Unlock() |
| 126 | 144 | ||
| 145 | go func() { | ||
| 146 | <-ctx.Done() | ||
| 147 | sub.stop(ctx.Err()) | ||
| 148 | r.subscriptionsMu.Lock() | ||
| 149 | delete(r.subscriptions, id) | ||
| 150 | r.subscriptionsMu.Unlock() | ||
| 151 | }() | ||
| 152 | |||
| 127 | env := ReqEnvelope{ | 153 | env := ReqEnvelope{ |
| 128 | SubscriptionID: id, | 154 | SubscriptionID: id, |
| 129 | Filters: filters, | 155 | Filters: filters, |
| @@ -132,10 +158,24 @@ func (r *Relay) Subscribe(ctx context.Context, id string, filters ...Filter) (*S | |||
| 132 | r.subscriptionsMu.Lock() | 158 | r.subscriptionsMu.Lock() |
| 133 | delete(r.subscriptions, id) | 159 | delete(r.subscriptions, id) |
| 134 | r.subscriptionsMu.Unlock() | 160 | r.subscriptionsMu.Unlock() |
| 135 | return nil, fmt.Errorf("failed to send subscription request: %w", err) | 161 | sub.stop(fmt.Errorf("failed to send subscription request: %w", err)) |
| 136 | } | 162 | } |
| 137 | 163 | ||
| 138 | return sub, nil | 164 | return sub |
| 165 | } | ||
| 166 | |||
| 167 | // Subscribe creates a subscription with the given filters. | ||
| 168 | // Events are received on the Events channel until the context is cancelled. | ||
| 169 | // After EOSE (end of stored events), the subscription continues to receive | ||
| 170 | // real-time events per the Nostr protocol. | ||
| 171 | func (r *Relay) Subscribe(ctx context.Context, filters ...Filter) *Subscription { | ||
| 172 | return r.subscribe(ctx, false, filters...) | ||
| 173 | } | ||
| 174 | |||
| 175 | // Fetch creates a subscription that closes automatically when EOSE is received. | ||
| 176 | // Use this for one-shot queries where you only want stored events. | ||
| 177 | func (r *Relay) Fetch(ctx context.Context, filters ...Filter) *Subscription { | ||
| 178 | return r.subscribe(ctx, true, filters...) | ||
| 139 | } | 179 | } |
| 140 | 180 | ||
| 141 | // dispatchEnvelope routes incoming messages to the appropriate subscription. | 181 | // dispatchEnvelope routes incoming messages to the appropriate subscription. |
| @@ -146,35 +186,57 @@ func (r *Relay) dispatchEnvelope(env Envelope) { | |||
| 146 | sub, ok := r.subscriptions[e.SubscriptionID] | 186 | sub, ok := r.subscriptions[e.SubscriptionID] |
| 147 | r.subscriptionsMu.RUnlock() | 187 | r.subscriptionsMu.RUnlock() |
| 148 | if ok { | 188 | if ok { |
| 149 | select { | 189 | sub.send(e.Event) |
| 150 | case sub.Events <- e.Event: | ||
| 151 | default: | ||
| 152 | // Channel full, drop event | ||
| 153 | } | ||
| 154 | } | 190 | } |
| 155 | case *EOSEEnvelope: | 191 | case *EOSEEnvelope: |
| 156 | r.subscriptionsMu.RLock() | 192 | r.subscriptionsMu.RLock() |
| 157 | sub, ok := r.subscriptions[e.SubscriptionID] | 193 | sub, ok := r.subscriptions[e.SubscriptionID] |
| 158 | r.subscriptionsMu.RUnlock() | 194 | r.subscriptionsMu.RUnlock() |
| 159 | if ok { | 195 | if ok && sub.closeOnEOSE { |
| 160 | select { | 196 | r.subscriptionsMu.Lock() |
| 161 | case sub.EOSE <- struct{}{}: | 197 | delete(r.subscriptions, e.SubscriptionID) |
| 162 | default: | 198 | r.subscriptionsMu.Unlock() |
| 163 | } | 199 | sub.stop(nil) |
| 164 | } | 200 | } |
| 165 | case *ClosedEnvelope: | 201 | case *ClosedEnvelope: |
| 166 | r.subscriptionsMu.Lock() | 202 | r.subscriptionsMu.Lock() |
| 167 | if sub, ok := r.subscriptions[e.SubscriptionID]; ok { | 203 | sub, ok := r.subscriptions[e.SubscriptionID] |
| 168 | close(sub.closed) | 204 | if ok { |
| 169 | delete(r.subscriptions, e.SubscriptionID) | 205 | delete(r.subscriptions, e.SubscriptionID) |
| 170 | } | 206 | } |
| 171 | r.subscriptionsMu.Unlock() | 207 | r.subscriptionsMu.Unlock() |
| 208 | if ok { | ||
| 209 | sub.stop(fmt.Errorf("subscription closed by relay: %s", e.Message)) | ||
| 210 | } | ||
| 211 | case *OKEnvelope: | ||
| 212 | r.okChannelsMu.Lock() | ||
| 213 | ch, ok := r.okChannels[e.EventID] | ||
| 214 | r.okChannelsMu.Unlock() | ||
| 215 | if ok { | ||
| 216 | select { | ||
| 217 | case ch <- e: | ||
| 218 | default: | ||
| 219 | } | ||
| 220 | } | ||
| 172 | } | 221 | } |
| 173 | } | 222 | } |
| 174 | 223 | ||
| 175 | // Listen reads messages from the relay and dispatches them to subscriptions. | 224 | // Listen reads messages from the relay and dispatches them to subscriptions. |
| 176 | // This should be called in a goroutine when using multiple subscriptions. | ||
| 177 | func (r *Relay) Listen(ctx context.Context) error { | 225 | func (r *Relay) Listen(ctx context.Context) error { |
| 226 | defer func() { | ||
| 227 | r.subscriptionsMu.Lock() | ||
| 228 | subs := make([]*Subscription, 0, len(r.subscriptions)) | ||
| 229 | for id, sub := range r.subscriptions { | ||
| 230 | subs = append(subs, sub) | ||
| 231 | delete(r.subscriptions, id) | ||
| 232 | } | ||
| 233 | r.subscriptionsMu.Unlock() | ||
| 234 | |||
| 235 | for _, sub := range subs { | ||
| 236 | sub.stop(fmt.Errorf("connection closed")) | ||
| 237 | } | ||
| 238 | }() | ||
| 239 | |||
| 178 | for { | 240 | for { |
| 179 | select { | 241 | select { |
| 180 | case <-ctx.Done(): | 242 | case <-ctx.Done(): |
| @@ -197,12 +259,43 @@ type Subscription struct { | |||
| 197 | relay *Relay | 259 | relay *Relay |
| 198 | Filters []Filter | 260 | Filters []Filter |
| 199 | Events chan *Event | 261 | Events chan *Event |
| 200 | EOSE chan struct{} | 262 | Err error |
| 201 | closed chan struct{} | 263 | |
| 264 | closeOnEOSE bool | ||
| 265 | mu sync.Mutex | ||
| 266 | done bool | ||
| 267 | } | ||
| 268 | |||
| 269 | // send delivers an event to the subscription's Events channel. | ||
| 270 | func (s *Subscription) send(ev *Event) { | ||
| 271 | s.mu.Lock() | ||
| 272 | defer s.mu.Unlock() | ||
| 273 | if s.done { | ||
| 274 | return | ||
| 275 | } | ||
| 276 | select { | ||
| 277 | case s.Events <- ev: | ||
| 278 | default: | ||
| 279 | } | ||
| 280 | } | ||
| 281 | |||
| 282 | // stop closes the subscription's Events channel and sets the error. | ||
| 283 | // It is idempotent — only the first call has any effect. | ||
| 284 | func (s *Subscription) stop(err error) { | ||
| 285 | s.mu.Lock() | ||
| 286 | defer s.mu.Unlock() | ||
| 287 | if s.done { | ||
| 288 | return | ||
| 289 | } | ||
| 290 | s.done = true | ||
| 291 | s.Err = err | ||
| 292 | close(s.Events) | ||
| 202 | } | 293 | } |
| 203 | 294 | ||
| 204 | // Close unsubscribes from the relay. | 295 | // Close unsubscribes from the relay. |
| 205 | func (s *Subscription) Close(ctx context.Context) error { | 296 | func (s *Subscription) Close(ctx context.Context) error { |
| 297 | s.stop(nil) | ||
| 298 | |||
| 206 | s.relay.subscriptionsMu.Lock() | 299 | s.relay.subscriptionsMu.Lock() |
| 207 | delete(s.relay.subscriptions, s.ID) | 300 | delete(s.relay.subscriptions, s.ID) |
| 208 | s.relay.subscriptionsMu.Unlock() | 301 | s.relay.subscriptionsMu.Unlock() |
| @@ -210,8 +303,3 @@ func (s *Subscription) Close(ctx context.Context) error { | |||
| 210 | env := CloseEnvelope{SubscriptionID: s.ID} | 303 | env := CloseEnvelope{SubscriptionID: s.ID} |
| 211 | return s.relay.Send(ctx, env) | 304 | return s.relay.Send(ctx, env) |
| 212 | } | 305 | } |
| 213 | |||
| 214 | // Closed returns a channel that's closed when the subscription is terminated. | ||
| 215 | func (s *Subscription) Closed() <-chan struct{} { | ||
| 216 | return s.closed | ||
| 217 | } | ||
diff --git a/relay_test.go b/relay_test.go index 4ace956..b39aa06 100644 --- a/relay_test.go +++ b/relay_test.go | |||
| @@ -76,9 +76,16 @@ func TestRelaySendReceive(t *testing.T) { | |||
| 76 | url := "ws" + strings.TrimPrefix(server.URL, "http") | 76 | url := "ws" + strings.TrimPrefix(server.URL, "http") |
| 77 | ctx := context.Background() | 77 | ctx := context.Background() |
| 78 | 78 | ||
| 79 | relay, err := Connect(ctx, url) | 79 | // Create relay without auto-Listen to test Send/Receive directly |
| 80 | conn, _, err := websocket.Dial(ctx, url, nil) | ||
| 80 | if err != nil { | 81 | if err != nil { |
| 81 | t.Fatalf("Connect() error = %v", err) | 82 | t.Fatalf("Dial() error = %v", err) |
| 83 | } | ||
| 84 | relay := &Relay{ | ||
| 85 | URL: url, | ||
| 86 | conn: conn, | ||
| 87 | subscriptions: make(map[string]*Subscription), | ||
| 88 | okChannels: make(map[string]chan *OKEnvelope), | ||
| 82 | } | 89 | } |
| 83 | defer relay.Close() | 90 | defer relay.Close() |
| 84 | 91 | ||
| @@ -234,7 +241,8 @@ func TestRelaySubscribe(t *testing.T) { | |||
| 234 | defer server.Close() | 241 | defer server.Close() |
| 235 | 242 | ||
| 236 | url := "ws" + strings.TrimPrefix(server.URL, "http") | 243 | url := "ws" + strings.TrimPrefix(server.URL, "http") |
| 237 | ctx := context.Background() | 244 | ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) |
| 245 | defer cancel() | ||
| 238 | 246 | ||
| 239 | relay, err := Connect(ctx, url) | 247 | relay, err := Connect(ctx, url) |
| 240 | if err != nil { | 248 | if err != nil { |
| @@ -242,30 +250,18 @@ func TestRelaySubscribe(t *testing.T) { | |||
| 242 | } | 250 | } |
| 243 | defer relay.Close() | 251 | defer relay.Close() |
| 244 | 252 | ||
| 245 | sub, err := relay.Subscribe(ctx, "sub1", Filter{Kinds: []int{1}}) | 253 | sub := relay.Fetch(ctx, Filter{Kinds: []int{1}}) |
| 246 | if err != nil { | ||
| 247 | t.Fatalf("Subscribe() error = %v", err) | ||
| 248 | } | ||
| 249 | 254 | ||
| 250 | // Start listening in background | ||
| 251 | go relay.Listen(ctx) | ||
| 252 | |||
| 253 | // Collect events | ||
| 254 | eventCount := 0 | 255 | eventCount := 0 |
| 255 | timeout := time.After(2 * time.Second) | 256 | for range sub.Events { |
| 256 | 257 | eventCount++ | |
| 257 | for { | 258 | } |
| 258 | select { | 259 | |
| 259 | case <-sub.Events: | 260 | if eventCount != 3 { |
| 260 | eventCount++ | 261 | t.Errorf("Received %d events, want 3", eventCount) |
| 261 | case <-sub.EOSE: | 262 | } |
| 262 | if eventCount != 3 { | 263 | if sub.Err != nil { |
| 263 | t.Errorf("Received %d events, want 3", eventCount) | 264 | t.Errorf("Subscription.Err = %v, want nil", sub.Err) |
| 264 | } | ||
| 265 | return | ||
| 266 | case <-timeout: | ||
| 267 | t.Fatal("Timeout waiting for events") | ||
| 268 | } | ||
| 269 | } | 265 | } |
| 270 | } | 266 | } |
| 271 | 267 | ||
| @@ -322,10 +318,7 @@ func TestSubscriptionClose(t *testing.T) { | |||
| 322 | } | 318 | } |
| 323 | defer relay.Close() | 319 | defer relay.Close() |
| 324 | 320 | ||
| 325 | sub, err := relay.Subscribe(ctx, "sub1", Filter{Kinds: []int{1}}) | 321 | sub := relay.Subscribe(ctx, Filter{Kinds: []int{1}}) |
| 326 | if err != nil { | ||
| 327 | t.Fatalf("Subscribe() error = %v", err) | ||
| 328 | } | ||
| 329 | 322 | ||
| 330 | if err := sub.Close(ctx); err != nil { | 323 | if err := sub.Close(ctx); err != nil { |
| 331 | t.Errorf("Subscription.Close() error = %v", err) | 324 | t.Errorf("Subscription.Close() error = %v", err) |
