aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-27 10:27:08 -0700
committerchrislu <chris.lu@gmail.com>2024-03-27 10:27:08 -0700
commit5cc94a05b93c5f13d02bd091ff450f9e8274c5ea (patch)
treecd6b0673ebff44a3a86549e8c5239efa7de69e6e
parent17806cde2af54a0aa08642f6e5f381e703b84788 (diff)
downloadseaweedfs-5cc94a05b93c5f13d02bd091ff450f9e8274c5ea.tar.xz
seaweedfs-5cc94a05b93c5f13d02bd091ff450f9e8274c5ea.zip
separate goroutine to send ack to publisher
-rw-r--r--weed/mq/broker/broker_grpc_pub.go94
-rw-r--r--weed/mq/topic/local_partition.go1
2 files changed, 40 insertions, 55 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index ab4320a9d..f8554ea5b 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -11,6 +11,8 @@ import (
"io"
"math/rand"
"net"
+ "sync/atomic"
+ "time"
)
// PUB
@@ -45,7 +47,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
response := &mq_pb.PublishMessageResponse{}
// TODO check whether current broker should be the leader for the topic partition
- ackInterval := 1
initMessage := req.GetInit()
if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
@@ -62,7 +63,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response)
}
- ackInterval = int(initMessage.AckInterval)
// connect to follower brokers
if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 {
@@ -104,22 +104,49 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
glog.Errorf("Error receiving follower ack: %v", err)
return
}
+ atomic.StoreInt64(&localTopicPartition.AckTsNs, ack.AckTsNs)
println("recv ack", ack.AckTsNs)
- if err := stream.Send(&mq_pb.PublishMessageResponse{
- AckSequence: ack.AckTsNs,
- }); err != nil {
- glog.Errorf("Error sending publisher ack %v: %v", ack, err)
- return
- }
}
}()
}
+ var receivedSequence, acknowledgedSequence int64
+ var isClosed bool
+
+ // start sending ack to publisher
+ ackInterval := int64(1)
+ if initMessage.AckInterval > 0 {
+ ackInterval = int64(initMessage.AckInterval)
+ }
+ go func() {
+ defer func() {
+ println("stop sending ack to publisher")
+ }()
+
+ lastAckTime := time.Now()
+ for !isClosed {
+ receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
+ if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
+ acknowledgedSequence = receivedSequence
+ response := &mq_pb.PublishMessageResponse{
+ AckSequence: acknowledgedSequence,
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending response %v: %v", response, err)
+ }
+ println("sent ack", acknowledgedSequence)
+ lastAckTime = time.Now()
+ } else {
+ time.Sleep(1 * time.Second)
+ }
+ }
+ }()
+
+
// process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
- ackCounter := 0
var ackSequence int64
defer func() {
// remove the publisher
@@ -146,24 +173,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// send a hello message
stream.Send(&mq_pb.PublishMessageResponse{})
- var receivedSequence, acknowledgedSequence int64
-
defer func() {
- if localTopicPartition.FollowerStream != nil {
- //if err := followerStream.CloseSend(); err != nil {
- // glog.Errorf("Error closing follower stream: %v", err)
- //}
- } else {
- if acknowledgedSequence < receivedSequence {
- acknowledgedSequence = receivedSequence
- response := &mq_pb.PublishMessageResponse{
- AckSequence: acknowledgedSequence,
- }
- if err := stream.Send(response); err != nil {
- glog.Errorf("Error sending response %v: %v", response, err)
- }
- }
- }
+ isClosed = true
}()
// process each published messages
@@ -175,7 +186,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
break
}
glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
- return err
+ break
}
// Process the received message
@@ -199,37 +210,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return followErr
}
} else {
- ackCounter++
- if ackCounter >= ackInterval {
- ackCounter = 0
- // send back the ack directly
- acknowledgedSequence = receivedSequence
- response := &mq_pb.PublishMessageResponse{
- AckSequence: acknowledgedSequence,
- }
- if err := stream.Send(response); err != nil {
- glog.Errorf("Error sending response %v: %v", response, err)
- }
- }
+ atomic.StoreInt64(&localTopicPartition.AckTsNs, receivedSequence)
}
}
- if localTopicPartition.FollowerStream != nil {
- // send close to the follower
- if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
- Message: &mq_pb.PublishFollowMeRequest_Close{
- Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
- },
- }); followErr != nil {
- return followErr
- }
- println("closing follower stream")
-
- //if err := followerStream.CloseSend(); err != nil {
- // glog.Errorf("Error closing follower stream: %v", err)
- //}
- }
-
glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
return nil
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index dbef9da89..34c2903f4 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -14,6 +14,7 @@ import (
type LocalPartition struct {
ListenersWaits int64
+ AckTsNs int64
// notifying clients
ListenersLock sync.Mutex