diff options
| author | chrislu <chris.lu@gmail.com> | 2025-10-13 21:07:48 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-10-13 21:07:48 -0700 |
| commit | f15eaaf8b9e9bfe92c392d6ba17f41140ea283f3 (patch) | |
| tree | 6bfb4850d0b5ed906a426b6440f3ed70e9e7085d | |
| parent | fba4fc3a7dc54576b735110739093a37f184b0f9 (diff) | |
| download | seaweedfs-f15eaaf8b9e9bfe92c392d6ba17f41140ea283f3.tar.xz seaweedfs-f15eaaf8b9e9bfe92c392d6ba17f41140ea283f3.zip | |
nil checking
| -rw-r--r-- | weed/mq/kafka/integration/broker_client_subscribe.go | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index a0b8504bf..e3ee6954b 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -442,6 +442,15 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib // Try to receive first record go func() { + // Check if stream is nil (can happen during session recreation race condition) + if session.Stream == nil { + select { + case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: + case <-ctx.Done(): + // Context cancelled, don't send (avoid blocking) + } + return + } resp, err := session.Stream.Recv() select { case recvChan <- recvResult{resp: resp, err: err}: @@ -501,6 +510,15 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib recvChan2 := make(chan recvResult, 1) go func() { + // Check if stream is nil (can happen during session recreation race condition) + if session.Stream == nil { + select { + case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}: + case <-ctx2.Done(): + // Context cancelled + } + return + } resp, err := session.Stream.Recv() select { case recvChan2 <- recvResult{resp: resp, err: err}: |
