aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-10-13 21:07:48 -0700
committerchrislu <chris.lu@gmail.com>2025-10-13 21:07:48 -0700
commitf15eaaf8b9e9bfe92c392d6ba17f41140ea283f3 (patch)
tree6bfb4850d0b5ed906a426b6440f3ed70e9e7085d
parentfba4fc3a7dc54576b735110739093a37f184b0f9 (diff)
downloadseaweedfs-f15eaaf8b9e9bfe92c392d6ba17f41140ea283f3.tar.xz
seaweedfs-f15eaaf8b9e9bfe92c392d6ba17f41140ea283f3.zip
nil checking
-rw-r--r--weed/mq/kafka/integration/broker_client_subscribe.go18
1 files changed, 18 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go
index a0b8504bf..e3ee6954b 100644
--- a/weed/mq/kafka/integration/broker_client_subscribe.go
+++ b/weed/mq/kafka/integration/broker_client_subscribe.go
@@ -442,6 +442,15 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
// Try to receive first record
go func() {
+ // Check if stream is nil (can happen during session recreation race condition)
+ if session.Stream == nil {
+ select {
+ case recvChan <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}:
+ case <-ctx.Done():
+ // Context cancelled, don't send (avoid blocking)
+ }
+ return
+ }
resp, err := session.Stream.Recv()
select {
case recvChan <- recvResult{resp: resp, err: err}:
@@ -501,6 +510,15 @@ func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscrib
recvChan2 := make(chan recvResult, 1)
go func() {
+ // Check if stream is nil (can happen during session recreation race condition)
+ if session.Stream == nil {
+ select {
+ case recvChan2 <- recvResult{resp: nil, err: fmt.Errorf("stream is nil")}:
+ case <-ctx2.Done():
+ // Context cancelled
+ }
+ return
+ }
resp, err := session.Stream.Recv()
select {
case recvChan2 <- recvResult{resp: resp, err: err}: