aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-17 08:57:47 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-17 08:57:47 -0700
commitf11233cd494b3092753b302166badbefe6bf401a (patch)
treeffa3ab99c19e194f32b94a2fb5f3be1fb80cc0c0
parent1ea9bc66d96030720e465bd283ad1eeff16724da (diff)
downloadseaweedfs-f11233cd494b3092753b302166badbefe6bf401a.tar.xz
seaweedfs-f11233cd494b3092753b302166badbefe6bf401a.zip
simplify disconnected stream detection
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go17
1 files changed, 2 insertions, 15 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index d8e8faa31..1065309d2 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -4,7 +4,6 @@ import (
"fmt"
"io"
"strings"
- "sync"
"time"
"github.com/golang/protobuf/proto"
@@ -50,23 +49,13 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
defer broker.topicManager.ReleaseLock(tp, false)
isConnected := true
- var streamLock sync.Mutex // https://github.com/grpc/grpc-go/issues/948
go func() {
- lastActiveTime := time.Now().UnixNano()
- sleepTime := 1737 * time.Millisecond
for isConnected {
- time.Sleep(sleepTime)
- if lastActiveTime != processedTsNs {
- lastActiveTime = processedTsNs
- continue
- }
- streamLock.Lock()
- // println("checking connection health to", subscriberId, tp.String())
- if err = stream.Send(&messaging_pb.BrokerMessage{}); err != nil {
+ if _, err := stream.Recv(); err != nil {
+ println("disconnecting connection to", subscriberId, tp.String())
isConnected = false
lock.cond.Signal()
}
- streamLock.Unlock()
}
}()
@@ -82,11 +71,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
// how to process each message
// an error returned will end the subscription
eachMessageFn := func(m *messaging_pb.Message) error {
- streamLock.Lock()
err := stream.Send(&messaging_pb.BrokerMessage{
Data: m,
})
- streamLock.Unlock()
if err != nil {
glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err)
}