aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-01 19:09:17 -0800
committerchrislu <chris.lu@gmail.com>2024-01-01 19:09:17 -0800
commit8b4cd50aac51134b54d365b6bc370ba81b491d76 (patch)
tree4236ca2cca4b39612cd9335e8aad259648b131a7
parentbf9ae74384c0dc351849a63c31b779f8481dadc7 (diff)
downloadseaweedfs-8b4cd50aac51134b54d365b6bc370ba81b491d76.tar.xz
seaweedfs-8b4cd50aac51134b54d365b6bc370ba81b491d76.zip
log errors
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go9
1 files changed, 6 insertions, 3 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index fa58fbdff..e60243083 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -86,16 +86,19 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
defer wg.Done()
defer func() { <-semaphore }()
glog.V(1).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
- sub.onEachPartition(partition, broker)
+ err := sub.onEachPartition(partition, broker)
+ if err != nil {
+ glog.V(0).Infof("subscriber %s/%s/%s partition %+v at %v: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err)
+ }
}(assigned.Partition, assigned.Broker)
}
wg.Wait()
}
-func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) {
+func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error {
// connect to the partition broker
- pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
Message: &mq_pb.SubscribeRequest_Init{
Init: &mq_pb.SubscribeRequest_InitMessage{