diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 32 |
1 files changed, 16 insertions, 16 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 9cdbe8325..ba8efb5e4 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -23,7 +23,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs return err } if req.GetInit() == nil { - glog.Errorf("missing init message") + log.Errorf("missing init message") return fmt.Errorf("missing init message") } @@ -33,7 +33,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) - glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + log.V(3).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) if getOrGenErr != nil { @@ -41,7 +41,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) - glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) + log.V(3).Infof("Subscriber %s connected on %v %v", clientName, t, partition) isConnected := true sleepIntervalCount := 0 @@ -49,7 +49,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs defer func() { isConnected = false localTopicPartition.Subscribers.RemoveSubscriber(clientName) - glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) + log.V(3).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveLocalPartition(t, partition) } @@ -60,7 +60,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // connect to the follower var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient - glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker) + log.V(3).Infof("follower broker: %v", req.GetInit().FollowerBroker) if req.GetInit().FollowerBroker != "" { follower := req.GetInit().FollowerBroker if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil { @@ -90,7 +90,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } } - glog.V(0).Infof("follower %s connected", follower) + log.V(3).Infof("follower %s connected", follower) } go func() { @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }}) break } - glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) + log.V(3).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) break } if ack.GetAck().Key == nil { @@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }, }, }); err != nil { - glog.Errorf("Error sending ack to follower: %v", err) + log.Errorf("Error sending ack to follower: %v", err) break } lastOffset = currentLastOffset @@ -133,9 +133,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } if lastOffset > 0 { - glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) + log.V(3).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil { - glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) + log.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) } } if subscribeFollowMeStream != nil { @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }, }); err != nil { if err != io.EOF { - glog.Errorf("Error sending close to follower: %v", err) + log.Errorf("Error sending close to follower: %v", err) } } } @@ -169,7 +169,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Client disconnected return false } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err) return false default: // Continue processing the request @@ -190,7 +190,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Client disconnected return false, nil } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err) return false, nil default: // Continue processing the request @@ -207,7 +207,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs TsNs: logEntry.TsNs, }, }}); err != nil { - glog.Errorf("Error sending data: %v", err) + log.Errorf("Error sending data: %v", err) return false, err } @@ -241,7 +241,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess // try to resume if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { - glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) + log.V(3).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) startPosition = log_buffer.NewMessagePosition(storedOffset, -2) return } |
