aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/sub_client')
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go20
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go14
-rw-r--r--weed/mq/client/sub_client/subscribe.go10
3 files changed, 22 insertions, 22 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index e88aaca2f..feccca7a4 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -1,7 +1,7 @@
package sub_client
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
@@ -29,17 +29,17 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
})
if err != nil {
- glog.V(0).Infof("broker coordinator on %s: %v", broker, err)
+ log.V(3).Infof("broker coordinator on %s: %v", broker, err)
continue
}
- glog.V(0).Infof("found broker coordinator: %v", brokerLeader)
+ log.V(3).Infof("found broker coordinator: %v", brokerLeader)
// connect to the balancer
pb.WithBrokerGrpcClient(true, brokerLeader, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.SubscriberToSubCoordinator(sub.ctx)
if err != nil {
- glog.V(0).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err)
return err
}
waitTime = 1 * time.Second
@@ -56,7 +56,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
},
},
}); err != nil {
- glog.V(0).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err)
return err
}
@@ -69,9 +69,9 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
default:
}
- glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
+ log.V(3).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
if err := stream.Send(reply); err != nil {
- glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
return
}
}
@@ -81,7 +81,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
for {
resp, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
return err
}
@@ -92,13 +92,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
}
sub.brokerPartitionAssignmentChan <- resp
- glog.V(0).Infof("Received assignment: %+v", resp)
+ log.V(3).Infof("Received assignment: %+v", resp)
}
return nil
})
}
- glog.V(0).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ log.V(3).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
if waitTime < 10*time.Second {
waitTime += 1 * time.Second
}
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 14a38cfa8..a931eb71f 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -52,10 +52,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
},
},
}); err != nil {
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
+ log.V(3).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
}
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
@@ -88,7 +88,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}()
for {
- // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ // log.V(3).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
@@ -97,7 +97,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
- glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ log.V(3).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
@@ -112,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
if m.Data.Ctrl != nil {
- glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
+ log.V(1).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
continue
}
if len(m.Data.Key) == 0 {
@@ -121,7 +121,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
onDataMessageFn(m)
case *mq_pb.SubscribeMessageResponse_Ctrl:
- // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
+ // log.V(3).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index d4dea3852..1d06e0601 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,7 +1,7 @@
package sub_client
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -60,7 +60,7 @@ func (sub *TopicSubscriber) startProcessors() {
<-semaphore
wg.Done()
}()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
@@ -84,9 +84,9 @@ func (sub *TopicSubscriber) startProcessors() {
err := sub.onEachPartition(assigned, stopChan, onDataMessageFn)
if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
+ log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
}
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
@@ -130,7 +130,7 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti
}
sub.activeProcessorsLock.Unlock()
if foundOverlapping {
- glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
+ log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
time.Sleep(1 * time.Second)
}
}