aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-06 23:16:41 -0700
committerchrislu <chris.lu@gmail.com>2023-09-06 23:16:41 -0700
commit875f562779f239a140d1008732b5375c0e511e61 (patch)
tree955215ac9eb019d9b9c7f0d0a563c2ed2bd86403
parent984b6c54cf6b0defaa6e727ab5e36809411fe92c (diff)
downloadseaweedfs-875f562779f239a140d1008732b5375c0e511e61.tar.xz
seaweedfs-875f562779f239a140d1008732b5375c0e511e61.zip
server side send response at least once per second
-rw-r--r--weed/mq/broker/broker_grpc_pub.go32
1 files changed, 25 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 854093f8a..20a31f09c 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync/atomic"
"time"
)
@@ -99,20 +100,37 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
ackCounter := 0
var ackSequence int64
+ var isStopping int32
respChan := make(chan *mq_pb.PublishResponse, 128)
- defer close(respChan)
+ defer func() {
+ atomic.StoreInt32(&isStopping, 1)
+ response := &mq_pb.PublishResponse{
+ Error: "end of stream",
+ }
+ respChan <- response
+ close(respChan)
+ }()
go func() {
+ ticker := time.NewTicker(1 * time.Second)
for {
select {
case resp := <-respChan:
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending setup response: %v", err)
+ if resp != nil {
+ if err := stream.Send(resp); err != nil {
+ glog.Errorf("Error sending setup response: %v", err)
+ }
+ } else {
+ return
}
- case <-time.After(1 * time.Second):
- response := &mq_pb.PublishResponse{
- AckSequence: ackSequence,
+ case <-ticker.C:
+ if atomic.LoadInt32(&isStopping) == 0 {
+ response := &mq_pb.PublishResponse{
+ AckSequence: ackSequence,
+ }
+ respChan <- response
+ } else {
+ return
}
- respChan <- response
}
}
}()