aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/cluster/lock_client.go6
-rw-r--r--weed/command/filer_remote_gateway.go2
-rw-r--r--weed/command/filer_remote_sync.go2
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go2
-rw-r--r--weed/operation/upload_content.go2
-rw-r--r--weed/pb/filer_pb_tail.go2
-rw-r--r--weed/s3api/auth_credentials_subscribe.go2
-rw-r--r--weed/util/retry.go5
8 files changed, 13 insertions, 10 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index 57198c865..bb1052656 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -57,7 +57,7 @@ func (lc *LockClient) StartLock(key string, owner string) (lock *LiveLock) {
lc: lc,
}
go func() {
- util.RetryForever("create lock:"+key, func() error {
+ util.RetryUntil("create lock:"+key, func() error {
errorMessage, err := lock.doLock(lock_manager.MaxDuration)
if err != nil {
glog.Infof("create lock %s: %s", key, err)
@@ -98,7 +98,7 @@ func (lc *LockClient) doNewLock(key string, lockDuration time.Duration, owner st
lockDuration = lc.maxLockDuration
needRenewal = true
}
- util.RetryForever("create lock:"+key, func() error {
+ util.RetryUntil("create lock:"+key, func() error {
errorMessage, err := lock.doLock(lockDuration)
if err != nil {
time.Sleep(time.Second)
@@ -148,7 +148,7 @@ func (lc *LockClient) keepLock(lock *LiveLock) {
select {
case <-ticker:
// renew the lock if lock.expireAtNs is still greater than now
- util.RetryForever("keep lock:"+lock.key, func() error {
+ util.RetryUntil("keep lock:"+lock.key, func() error {
lockDuration := time.Duration(lock.expireAtNs-time.Now().UnixNano()) * time.Nanosecond
if lockDuration > lc.maxLockDuration {
lockDuration = lc.maxLockDuration
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go
index f7b1f3146..61a5d26a2 100644
--- a/weed/command/filer_remote_gateway.go
+++ b/weed/command/filer_remote_gateway.go
@@ -111,7 +111,7 @@ func runFilerRemoteGateway(cmd *Command, args []string) bool {
// synchronize /buckets folder
fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir)
- util.RetryForever("filer.remote.sync buckets", func() error {
+ util.RetryUntil("filer.remote.sync buckets", func() error {
return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource)
}, func(err error) bool {
if err != nil {
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 261e024a6..2d6133367 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -90,7 +90,7 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
if dir != "" {
fmt.Printf("synchronize %s to remote storage...\n", dir)
- util.RetryForever("filer.remote.sync "+dir, func() error {
+ util.RetryUntil("filer.remote.sync "+dir, func() error {
return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
}, func(err error) bool {
if err != nil {
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
index d5d8150bc..9500bd9db 100644
--- a/weed/mount/meta_cache/meta_cache_subscribe.go
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -70,7 +70,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
StopTsNs: 0,
EventErrorType: pb.FatalOnError,
}
- util.RetryForever("followMetaUpdates", func() error {
+ util.RetryUntil("followMetaUpdates", func() error {
clientEpoch++
return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
}, func(err error) bool {
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 17e767472..108ec3df2 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -116,7 +116,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
return uploadErr
}
if uploadOption.RetryForever {
- util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
+ util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
glog.V(0).Infof("upload content: %v", err)
return true
})
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go
index 674d55ad8..b54dad871 100644
--- a/weed/pb/filer_pb_tail.go
+++ b/weed/pb/filer_pb_tail.go
@@ -90,7 +90,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc
case FatalOnError:
glog.Fatalf("process %v: %v", resp, err)
case RetryForeverOnError:
- util.RetryForever("followMetaUpdates", func() error {
+ util.RetryUntil("followMetaUpdates", func() error {
return processEventFn(resp)
}, func(err error) bool {
glog.Errorf("process %v: %v", resp, err)
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go
index 8006ff326..766408909 100644
--- a/weed/s3api/auth_credentials_subscribe.go
+++ b/weed/s3api/auth_credentials_subscribe.go
@@ -46,7 +46,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, p
StopTsNs: 0,
EventErrorType: pb.FatalOnError,
}
- util.RetryForever("followIamChanges", func() error {
+ util.RetryUntil("followIamChanges", func() error {
clientEpoch++
return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
}, func(err error) bool {
diff --git a/weed/util/retry.go b/weed/util/retry.go
index 997fa5d13..cdc020b7a 100644
--- a/weed/util/retry.go
+++ b/weed/util/retry.go
@@ -57,7 +57,8 @@ func MultiRetry(name string, errList []string, job func() error) (err error) {
return err
}
-func RetryForever(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) {
+// RetryUntil retries until the job returns no error or onErrFn returns false
+func RetryUntil(name string, job func() error, onErrFn func(err error) (shouldContinue bool)) {
waitTime := time.Second
for {
err := job()
@@ -74,6 +75,8 @@ func RetryForever(name string, job func() error, onErrFn func(err error) (should
waitTime += waitTime / 2
}
continue
+ } else {
+ break
}
}
}