aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go32
1 files changed, 16 insertions, 16 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 9cdbe8325..ba8efb5e4 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -23,7 +23,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
return err
}
if req.GetInit() == nil {
- glog.Errorf("missing init message")
+ log.Errorf("missing init message")
return fmt.Errorf("missing init message")
}
@@ -33,7 +33,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
- glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+ log.V(3).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
if getOrGenErr != nil {
@@ -41,7 +41,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
- glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
+ log.V(3).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
sleepIntervalCount := 0
@@ -49,7 +49,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
defer func() {
isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
- glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
+ log.V(3).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveLocalPartition(t, partition)
}
@@ -60,7 +60,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// connect to the follower
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
- glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
+ log.V(3).Infof("follower broker: %v", req.GetInit().FollowerBroker)
if req.GetInit().FollowerBroker != "" {
follower := req.GetInit().FollowerBroker
if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
@@ -90,7 +90,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
}
- glog.V(0).Infof("follower %s connected", follower)
+ log.V(3).Infof("follower %s connected", follower)
}
go func() {
@@ -107,7 +107,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}})
break
}
- glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
+ log.V(3).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
break
}
if ack.GetAck().Key == nil {
@@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
},
},
}); err != nil {
- glog.Errorf("Error sending ack to follower: %v", err)
+ log.Errorf("Error sending ack to follower: %v", err)
break
}
lastOffset = currentLastOffset
@@ -133,9 +133,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
if lastOffset > 0 {
- glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
+ log.V(3).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
- glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
+ log.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
}
}
if subscribeFollowMeStream != nil {
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
},
}); err != nil {
if err != io.EOF {
- glog.Errorf("Error sending close to follower: %v", err)
+ log.Errorf("Error sending close to follower: %v", err)
}
}
}
@@ -169,7 +169,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Client disconnected
return false
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
// Continue processing the request
@@ -190,7 +190,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Client disconnected
return false, nil
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
@@ -207,7 +207,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
TsNs: logEntry.TsNs,
},
}}); err != nil {
- glog.Errorf("Error sending data: %v", err)
+ log.Errorf("Error sending data: %v", err)
return false, err
}
@@ -241,7 +241,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
// try to resume
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
- glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
+ log.V(3).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}