diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 5 |
1 files changed, 2 insertions, 3 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index a217489de..d633a3efa 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -69,7 +69,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return stream.Send(response) } - var receivedSequence, acknowledgedSequence int64 + var receivedSequence, acknowledgedSequence int64 var isClosed bool // start sending ack to publisher @@ -85,7 +85,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis lastAckTime := time.Now() for !isClosed { receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) - if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){ + if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) { acknowledgedSequence = receivedSequence response := &mq_pb.PublishMessageResponse{ AckSequence: acknowledgedSequence, @@ -101,7 +101,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } }() - // process each published messages clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) |
