aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 5019f4f72..60cc1c727 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -79,7 +79,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
go func() {
defer func() {
- println("stop sending ack to publisher")
+ println("stop sending ack to publisher", initMessage.PublisherName)
}()
lastAckTime := time.Now()
@@ -93,7 +93,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending response %v: %v", response, err)
}
- println("sent ack", acknowledgedSequence)
+ println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
lastAckTime = time.Now()
} else {
time.Sleep(1 * time.Second)
@@ -130,7 +130,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err == io.EOF {
break
}
- glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
break
}
@@ -146,7 +146,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
}
- glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
+ glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
return nil
}