aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-09 12:07:15 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-09 12:07:15 -0700
commitdaf0a449f7424d4a8252673509af5afd0b9bd8ec (patch)
treee9b9ba152b2fade26b71e7710ae6730dafe9d39e /weed/wdclient
parent7f69acd1f21b4e42afff155b633419eda17af331 (diff)
downloadseaweedfs-daf0a449f7424d4a8252673509af5afd0b9bd8ec.tar.xz
seaweedfs-daf0a449f7424d4a8252673509af5afd0b9bd8ec.zip
properly cancel context for streaming grpc
Diffstat (limited to 'weed/wdclient')
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go13
-rw-r--r--weed/wdclient/masterclient.go5
2 files changed, 14 insertions, 4 deletions
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 1ecfe6ce2..801de14ce 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -46,10 +46,13 @@ func (l *ExclusiveLocker) RequestLock() {
return
}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
// retry to get the lease
for {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
@@ -73,7 +76,7 @@ func (l *ExclusiveLocker) RequestLock() {
go func() {
for l.isLocking {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
@@ -98,8 +101,12 @@ func (l *ExclusiveLocker) RequestLock() {
func (l *ExclusiveLocker) ReleaseLock() {
l.isLocking = false
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+ client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 4c066d535..3d23d8f13 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -70,7 +70,10 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
- stream, err := client.KeepConnected(context.Background())
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.KeepConnected(ctx)
if err != nil {
glog.V(0).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
return err