diff options
Diffstat (limited to 'weed/mq/kafka/integration/broker_client.go')
| -rw-r--r-- | weed/mq/kafka/integration/broker_client.go | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/weed/mq/kafka/integration/broker_client.go b/weed/mq/kafka/integration/broker_client.go index f4db2a7c6..c1f743f0b 100644 --- a/weed/mq/kafka/integration/broker_client.go +++ b/weed/mq/kafka/integration/broker_client.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -29,6 +30,12 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor // operating even during client shutdown, which is important for testing scenarios. dialCtx := context.Background() + // CRITICAL FIX: Add timeout to dial context + // gRPC dial will retry with exponential backoff. Without a timeout, it hangs indefinitely + // if the broker is unreachable. Set a reasonable timeout for initial connection attempt. + dialCtx, dialCancel := context.WithTimeout(dialCtx, 30*time.Second) + defer dialCancel() + // Connect to broker // Load security configuration for broker connection util.LoadSecurityConfiguration() @@ -45,14 +52,17 @@ func NewBrokerClientWithFilerAccessor(brokerAddress string, filerClientAccessor client := mq_pb.NewSeaweedMessagingClient(conn) return &BrokerClient{ - filerClientAccessor: filerClientAccessor, - brokerAddress: brokerAddress, - conn: conn, - client: client, - publishers: make(map[string]*BrokerPublisherSession), - subscribers: make(map[string]*BrokerSubscriberSession), - ctx: ctx, - cancel: cancel, + filerClientAccessor: filerClientAccessor, + brokerAddress: brokerAddress, + conn: conn, + client: client, + publishers: make(map[string]*BrokerPublisherSession), + subscribers: make(map[string]*BrokerSubscriberSession), + fetchRequests: make(map[string]*FetchRequest), + partitionAssignmentCache: make(map[string]*partitionAssignmentCacheEntry), + partitionAssignmentCacheTTL: 30 * time.Second, // Same as broker's cache TTL + ctx: ctx, + cancel: cancel, }, nil } @@ -425,6 +435,7 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) { ctx, cancel := context.WithTimeout(bc.ctx, 5*time.Second) defer cancel() + glog.V(2).Infof("[BrokerClient] TopicExists: Querying broker for topic %s", topicName) resp, err := bc.client.TopicExists(ctx, &mq_pb.TopicExistsRequest{ Topic: &schema_pb.Topic{ Namespace: "kafka", @@ -432,8 +443,10 @@ func (bc *BrokerClient) TopicExists(topicName string) (bool, error) { }, }) if err != nil { + glog.V(1).Infof("[BrokerClient] TopicExists: ERROR for topic %s: %v", topicName, err) return false, fmt.Errorf("failed to check topic existence: %v", err) } + glog.V(2).Infof("[BrokerClient] TopicExists: Topic %s exists=%v", topicName, resp.Exists) return resp.Exists, nil } |
