aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/broker_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/broker_client.go')
-rw-r--r--weed/mq/kafka/integration/broker_client.go29
1 files changed, 21 insertions, 8 deletions
diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go
index f4db2a7c6..c1f743f0b 100644
--- a/weed/mq/kafka/integration/broker_client.go
+++ b/weed/mq/kafka/integration/broker_client.go
@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -29,6 +30,12 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor
// operating even during client shutdown, which is important for testing scenarios.
dialCtx := context.Background()
+ // CRITICAL FIX: Add timeout to dial context
+ // gRPC dial will retry with exponential backoff. Without a timeout, it hangs indefinitely
+ // if the broker is unreachable. Set a reasonable timeout for initial connection attempt.
+ dialCtx, dialCancel := context.WithTimeout(dialCtx, 30*time.Second)
+ defer dialCancel()
+
// Connect to broker
// Load security configuration for broker connection
util.LoadSecurityConfiguration()
@@ -45,14 +52,17 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor
client := mq_pb.NewSeaweedMessagingClient(conn)
return &BrokerClient{
- filerClientAccessor: filerClientAccessor,
- brokerAddress: brokerAddress,
- conn: conn,
- client: client,
- publishers: make(map[string]*BrokerPublisherSession),
- subscribers: make(map[string]*BrokerSubscriberSession),
- ctx: ctx,
- cancel: cancel,
+ filerClientAccessor: filerClientAccessor,
+ brokerAddress: brokerAddress,
+ conn: conn,
+ client: client,
+ publishers: make(map[string]*BrokerPublisherSession),
+ subscribers: make(map[string]*BrokerSubscriberSession),
+ fetchRequests: make(map[string]*FetchRequest),
+ partitionAssignmentCache: make(map[string]*partitionAssignmentCacheEntry),
+ partitionAssignmentCacheTTL: 30 * time.Second, // Same as broker's cache TTL
+ ctx: ctx,
+ cancel: cancel,
}, nil
}
@@ -425,6 +435,7 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) {
ctx, cancel := context.WithTimeout(bc.ctx, 5*time.Second)
defer cancel()
+ glog.V(2).Infof("[BrokerClient] TopicExists: Querying broker for topic %s", topicName)
resp, err := bc.client.TopicExists(ctx, &mq_pb.TopicExistsRequest{
Topic: &schema_pb.Topic{
Namespace: "kafka",
@@ -432,8 +443,10 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) {
},
})
if err != nil {
+ glog.V(1).Infof("[BrokerClient] TopicExists: ERROR for topic %s: %v", topicName, err)
return false, fmt.Errorf("failed to check topic existence: %v", err)
}
+ glog.V(2).Infof("[BrokerClient] TopicExists: Topic %s exists=%v", topicName, resp.Exists)
return resp.Exists, nil
}