aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_pub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go20
1 files changed, 10 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index f31dc7eff..64841a1b9 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
@@ -46,7 +46,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
initMessage := req.GetInit()
if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
- glog.Errorf("missing init message")
+ log.Errorf("missing init message")
return stream.Send(response)
}
@@ -55,14 +55,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
if getOrGenErr != nil {
response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
- glog.Errorf("topic %v not found: %v", t, getOrGenErr)
+ log.Errorf("topic %v not found: %v", t, getOrGenErr)
return stream.Send(response)
}
// connect to follower brokers
if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
response.Error = followerErr.Error()
- glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
+ log.Errorf("MaybeConnectToFollowers: %v", followerErr)
return stream.Send(response)
}
@@ -88,7 +88,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
AckSequence: acknowledgedSequence,
}
if err := stream.Send(response); err != nil {
- glog.Errorf("Error sending response %v: %v", response, err)
+ log.Errorf("Error sending response %v: %v", response, err)
}
// println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
lastAckTime = time.Now()
@@ -107,7 +107,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition.Publishers.RemovePublisher(clientName)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveLocalPartition(t, p)
- glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
+ log.V(3).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
}()
@@ -126,7 +126,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err == io.EOF {
break
}
- glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
+ log.V(3).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
break
}
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
}
- glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
+ log.V(3).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
return nil
}
@@ -155,11 +155,11 @@ func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
- glog.Error("failed to get peer from ctx")
+ log.Error("failed to get peer from ctx")
return ""
}
if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
+ log.Error("failed to get peer address")
return ""
}
return pr.Addr.String()