aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/msgclient/sub_chan.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-17 11:10:45 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-17 11:10:45 -0700
commit95ca9dd8a2de6af3eb030880123dded9ed6de602 (patch)
treebdbec2e5158e1c5f66a7b05356e4fb69ecbdc819 /weed/messaging/msgclient/sub_chan.go
parentf11233cd494b3092753b302166badbefe6bf401a (diff)
downloadseaweedfs-95ca9dd8a2de6af3eb030880123dded9ed6de602.tar.xz
seaweedfs-95ca9dd8a2de6af3eb030880123dded9ed6de602.zip
subscribe support cancel
Diffstat (limited to 'weed/messaging/msgclient/sub_chan.go')
-rw-r--r--weed/messaging/msgclient/sub_chan.go11
1 files changed, 10 insertions, 1 deletions
diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go
index 3eabc6210..213ff4666 100644
--- a/weed/messaging/msgclient/sub_chan.go
+++ b/weed/messaging/msgclient/sub_chan.go
@@ -1,6 +1,7 @@
package msgclient
import (
+ "context"
"crypto/md5"
"hash"
"io"
@@ -15,6 +16,7 @@ type SubChannel struct {
ch chan []byte
stream messaging_pb.SeaweedMessaging_SubscribeClient
md5hash hash.Hash
+ cancel context.CancelFunc
}
func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
@@ -27,7 +29,8 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha
if err != nil {
return nil, err
}
- sc, err := setupSubscriberClient(grpcConnection, tp, subscriberId, time.Unix(0, 0))
+ ctx, cancel := context.WithCancel(context.Background())
+ sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
if err != nil {
return nil, err
}
@@ -36,6 +39,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha
ch: make(chan []byte),
stream: sc,
md5hash: md5.New(),
+ cancel: cancel,
}
go func() {
@@ -57,6 +61,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha
IsClose: true,
})
close(t.ch)
+ cancel()
return
}
t.ch <- resp.Data.Value
@@ -74,3 +79,7 @@ func (sc *SubChannel) Channel() chan []byte {
func (sc *SubChannel) Md5() []byte {
return sc.md5hash.Sum(nil)
}
+
+func (sc *SubChannel) Cancel() {
+ sc.cancel()
+}