diff options
Diffstat (limited to 'weed/wdclient')
| -rw-r--r-- | weed/wdclient/exclusive_locks/exclusive_locker.go | 6 | ||||
| -rw-r--r-- | weed/wdclient/masterclient.go | 8 |
2 files changed, 7 insertions, 7 deletions
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go index 725fa307d..1767ee4a4 100644 --- a/weed/wdclient/exclusive_locks/exclusive_locker.go +++ b/weed/wdclient/exclusive_locks/exclusive_locker.go @@ -54,7 +54,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { // retry to get the lease for { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), @@ -82,7 +82,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) { defer cancel2() for l.isLocking { - if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), @@ -114,7 +114,7 @@ func (l *ExclusiveLocker) ReleaseLock() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - l.masterClient.WithClient(func(client master_pb.SeaweedClient) error { + l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{ PreviousToken: atomic.LoadInt64(&l.token), PreviousLockTime: atomic.LoadInt64(&l.lockTsNs), diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 727d9cd34..672b3ac49 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -59,7 +59,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres if master == myMasterAddress { continue } - if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) defer cancel() resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) @@ -96,7 +96,7 @@ func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master) - gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -172,12 +172,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL return } -func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error { +func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { for mc.currentMaster == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) }) |
