diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-17 11:10:45 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-17 11:10:45 -0700 |
| commit | 95ca9dd8a2de6af3eb030880123dded9ed6de602 (patch) | |
| tree | bdbec2e5158e1c5f66a7b05356e4fb69ecbdc819 /weed/messaging/msgclient/sub_chan.go | |
| parent | f11233cd494b3092753b302166badbefe6bf401a (diff) | |
| download | seaweedfs-95ca9dd8a2de6af3eb030880123dded9ed6de602.tar.xz seaweedfs-95ca9dd8a2de6af3eb030880123dded9ed6de602.zip | |
subscribe support cancel
Diffstat (limited to 'weed/messaging/msgclient/sub_chan.go')
| -rw-r--r-- | weed/messaging/msgclient/sub_chan.go | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go index 3eabc6210..213ff4666 100644 --- a/weed/messaging/msgclient/sub_chan.go +++ b/weed/messaging/msgclient/sub_chan.go @@ -1,6 +1,7 @@ package msgclient import ( + "context" "crypto/md5" "hash" "io" @@ -15,6 +16,7 @@ type SubChannel struct { ch chan []byte stream messaging_pb.SeaweedMessaging_SubscribeClient md5hash hash.Hash + cancel context.CancelFunc } func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) { @@ -27,7 +29,8 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha if err != nil { return nil, err } - sc, err := setupSubscriberClient(grpcConnection, tp, subscriberId, time.Unix(0, 0)) + ctx, cancel := context.WithCancel(context.Background()) + sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0)) if err != nil { return nil, err } @@ -36,6 +39,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha ch: make(chan []byte), stream: sc, md5hash: md5.New(), + cancel: cancel, } go func() { @@ -57,6 +61,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha IsClose: true, }) close(t.ch) + cancel() return } t.ch <- resp.Data.Value @@ -74,3 +79,7 @@ func (sc *SubChannel) Channel() chan []byte { func (sc *SubChannel) Md5() []byte { return sc.md5hash.Sum(nil) } + +func (sc *SubChannel) Cancel() { + sc.cancel() +} |
