aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-11 23:08:53 -0800
committerchrislu <chris.lu@gmail.com>2024-01-11 23:08:53 -0800
commit3aa3991f0fee90659872cf0e34a2a3b0304295e4 (patch)
tree121b92f095a8e65187e3acc41e1237a57c73cd4e
parent7afaad31a45bd8733e34ab286b30fb428180a23f (diff)
downloadseaweedfs-3aa3991f0fee90659872cf0e34a2a3b0304295e4.tar.xz
seaweedfs-3aa3991f0fee90659872cf0e34a2a3b0304295e4.zip
adjust client side logs
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go17
1 files changed, 7 insertions, 10 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 0172352b0..6d0eb0d43 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -23,7 +23,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
stream, err := client.SubscriberToSubCoordinator(ctx)
if err != nil {
- glog.V(1).Infof("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
+ glog.V(0).Infof("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
return err
}
waitTime = 1 * time.Second
@@ -42,7 +42,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
},
},
}); err != nil {
- glog.V(1).Infof("subscriber %s/%s send init: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
+ glog.V(0).Infof("subscriber %s/%s send init: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
return err
}
@@ -50,7 +50,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
for {
resp, err := stream.Recv()
if err != nil {
- glog.V(1).Infof("subscriber %s/%s receive: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
+ glog.V(0).Infof("subscriber %s/%s receive: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
return err
}
assignment := resp.GetAssignment()
@@ -63,7 +63,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
})
}
- glog.V(4).Infof("subscriber %s/%s/%s waiting for more assignments", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ glog.V(0).Infof("subscriber %s/%s/%s waiting for more assignments", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
if waitTime < 10*time.Second {
waitTime += 1 * time.Second
}
@@ -109,11 +109,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
Name: sub.ContentConfig.Topic,
},
PartitionOffset: &mq_pb.PartitionOffset{
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- },
+ Partition: partition,
StartTsNs: sub.alreadyProcessedTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
@@ -143,7 +139,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
}()
for {
- glog.V(3).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
return fmt.Errorf("subscribe recv: %v", err)
@@ -163,6 +159,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return nil
}
case *mq_pb.SubscribeMessageResponse_Ctrl:
+ // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
}