aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient')
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go6
-rw-r--r--weed/wdclient/masterclient.go8
2 files changed, 7 insertions, 7 deletions
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 725fa307d..1767ee4a4 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -54,7 +54,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
// retry to get the lease
for {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
@@ -82,7 +82,7 @@ func (l *ExclusiveLocker) RequestLock(clientName string) {
defer cancel2()
for l.isLocking {
- if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ if err := l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
@@ -114,7 +114,7 @@ func (l *ExclusiveLocker) ReleaseLock() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ l.masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 727d9cd34..672b3ac49 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -59,7 +59,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
if master == myMasterAddress {
continue
}
- if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
@@ -96,7 +96,7 @@ func (mc *MasterClient) tryAllMasters() {
func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
- gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -172,12 +172,12 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
return
}
-func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
+func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
for mc.currentMaster == "" {
time.Sleep(3 * time.Second)
}
- return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})