diff options
Diffstat (limited to 'weed/mq/broker')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 5019f4f72..60cc1c727 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -79,7 +79,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } go func() { defer func() { - println("stop sending ack to publisher") + println("stop sending ack to publisher", initMessage.PublisherName) }() lastAckTime := time.Now() @@ -93,7 +93,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err := stream.Send(response); err != nil { glog.Errorf("Error sending response %v: %v", response, err) } - println("sent ack", acknowledgedSequence) + println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) lastAckTime = time.Now() } else { time.Sleep(1 * time.Second) @@ -130,7 +130,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) break } @@ -146,7 +146,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } } - glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition) + glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } |
