diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f31dc7eff..64841a1b9 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" @@ -46,7 +46,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis initMessage := req.GetInit() if initMessage == nil { response.Error = fmt.Sprintf("missing init message") - glog.Errorf("missing init message") + log.Errorf("missing init message") return stream.Send(response) } @@ -55,14 +55,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) if getOrGenErr != nil { response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) - glog.Errorf("topic %v not found: %v", t, getOrGenErr) + log.Errorf("topic %v not found: %v", t, getOrGenErr) return stream.Send(response) } // connect to follower brokers if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { response.Error = followerErr.Error() - glog.Errorf("MaybeConnectToFollowers: %v", followerErr) + log.Errorf("MaybeConnectToFollowers: %v", followerErr) return stream.Send(response) } @@ -88,7 +88,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis AckSequence: acknowledgedSequence, } if err := stream.Send(response); err != nil { - glog.Errorf("Error sending response %v: %v", response, err) + log.Errorf("Error sending response %v: %v", response, err) } // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) lastAckTime = time.Now() @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition.Publishers.RemovePublisher(clientName) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveLocalPartition(t, p) - glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) + log.V(3).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } }() @@ -126,7 +126,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + log.V(3).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) break } @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } } - glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) + log.V(3).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } @@ -155,11 +155,11 @@ func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) pr, ok := peer.FromContext(ctx) if !ok { - glog.Error("failed to get peer from ctx") + log.Error("failed to get peer from ctx") return "" } if pr.Addr == net.Addr(nil) { - glog.Error("failed to get peer address") + log.Error("failed to get peer address") return "" } return pr.Addr.String() |
