diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-01 19:09:17 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-01 19:09:17 -0800 |
| commit | 8b4cd50aac51134b54d365b6bc370ba81b491d76 (patch) | |
| tree | 4236ca2cca4b39612cd9335e8aad259648b131a7 | |
| parent | bf9ae74384c0dc351849a63c31b779f8481dadc7 (diff) | |
| download | seaweedfs-8b4cd50aac51134b54d365b6bc370ba81b491d76.tar.xz seaweedfs-8b4cd50aac51134b54d365b6bc370ba81b491d76.zip | |
log errors
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index fa58fbdff..e60243083 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -86,16 +86,19 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo defer wg.Done() defer func() { <-semaphore }() glog.V(1).Infof("subscriber %s/%s/%s assigned partition %+v at %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker) - sub.onEachPartition(partition, broker) + err := sub.onEachPartition(partition, broker) + if err != nil { + glog.V(0).Infof("subscriber %s/%s/%s partition %+v at %v: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err) + } }(assigned.Partition, assigned.Broker) } wg.Wait() } -func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) { +func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { // connect to the partition broker - pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ Message: &mq_pb.SubscribeRequest_Init{ Init: &mq_pb.SubscribeRequest_InitMessage{ |
