aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_pub.go32
1 files changed, 25 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 854093f8a..20a31f09c 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -7,6 +7,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync/atomic"
"time"
)
@@ -99,20 +100,37 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
ackCounter := 0
var ackSequence int64
+ var isStopping int32
respChan := make(chan *mq_pb.PublishResponse, 128)
- defer close(respChan)
+ defer func() {
+ atomic.StoreInt32(&isStopping, 1)
+ response := &mq_pb.PublishResponse{
+ Error: "end of stream",
+ }
+ respChan <- response
+ close(respChan)
+ }()
go func() {
+ ticker := time.NewTicker(1 * time.Second)
for {
select {
case resp := <-respChan:
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending setup response: %v", err)
+ if resp != nil {
+ if err := stream.Send(resp); err != nil {
+ glog.Errorf("Error sending setup response: %v", err)
+ }
+ } else {
+ return
}
- case <-time.After(1 * time.Second):
- response := &mq_pb.PublishResponse{
- AckSequence: ackSequence,
+ case <-ticker.C:
+ if atomic.LoadInt32(&isStopping) == 0 {
+ response := &mq_pb.PublishResponse{
+ AckSequence: ackSequence,
+ }
+ respChan <- response
+ } else {
+ return
}
- respChan <- response
}
}
}()