aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go2
-rw-r--r--weed/command/filer_backup.go2
-rw-r--r--weed/command/filer_meta_backup.go2
-rw-r--r--weed/command/filer_meta_tail.go2
-rw-r--r--weed/command/filer_remote_gateway_buckets.go2
-rw-r--r--weed/command/filer_remote_sync_dir.go2
-rw-r--r--weed/command/filer_sync.go2
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go2
-rw-r--r--weed/pb/filer_pb_tail.go33
-rw-r--r--weed/s3api/auth_credentials_subscribe.go2
11 files changed, 36 insertions, 17 deletions
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
index aa9840b7f..d4b9d63b1 100644
--- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -77,7 +77,7 @@ func startGenerateMetadata() {
func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
- tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), "tail", 0, *dir, nil, 0, 0, 0, eachEntryFunc, false)
+ tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithInsecure(), "tail", 0, *dir, nil, 0, 0, 0, eachEntryFunc, pb.TrivialOnError)
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index f029e4ef2..d191c693b 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -114,6 +114,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, false)
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index cced19faa..cf679885d 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -195,7 +195,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
})
return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId,
- *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, false)
+ *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index cc3e014c6..66a87c3d9 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -119,7 +119,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
return err
}
return nil
- }, false)
+ }, pb.TrivialOnError)
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index 38d936826..c3ff756db 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -39,7 +39,7 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
- option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, false)
+ option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 02e7218e9..5fc20be9a 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -41,7 +41,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
- mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, false)
+ mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 2dd780a75..deb458525 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -178,7 +178,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
- sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, false)
+ sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 81d2aa158..42c25838e 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -85,7 +85,7 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
- }, true)
+ }, pb.FatalOnError)
return
}
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
index c30ec3699..c8ccdd375 100644
--- a/weed/mount/meta_cache/meta_cache_subscribe.go
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -58,7 +58,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
util.RetryForever("followMetaUpdates", func() error {
- return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, true)
+ return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, pb.FatalOnError)
}, func(err error) bool {
glog.Errorf("follow metadata updates: %v", err)
return true
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go
index 0fba972d1..2d6a7898b 100644
--- a/weed/pb/filer_pb_tail.go
+++ b/weed/pb/filer_pb_tail.go
@@ -5,19 +5,28 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"io"
"time"
)
+type EventErrorType int
+
+const (
+ TrivialOnError EventErrorType = iota
+ FatalOnError
+ RetryForeverOnError
+)
+
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption, clientName string, clientId int32,
pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) error {
+ processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId,
- pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError))
+ pathPrefix, additionalPathPrefixes, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
@@ -26,10 +35,10 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption,
func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
clientName string, clientId int32, pathPrefix string, lastTsNs *int64, untilTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) error {
+ processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
err := filerClient.WithFilerClient(true, makeSubscribeMetadataFunc(clientName, clientId,
- pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, fatalOnError))
+ pathPrefix, nil, lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
@@ -38,7 +47,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
}
func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix string, additionalPathPrefixes []string, lastTsNs *int64, untilTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
+ processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) func(client filer_pb.SeaweedFilerClient) error {
return func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -65,9 +74,19 @@ func makeSubscribeMetadataFunc(clientName string, clientId int32, pathPrefix str
}
if err := processEventFn(resp); err != nil {
- if fatalOnError {
+ switch eventErrorType {
+ case TrivialOnError:
+ glog.Errorf("process %v: %v", resp, err)
+ case FatalOnError:
glog.Fatalf("process %v: %v", resp, err)
- } else {
+ case RetryForeverOnError:
+ util.RetryForever("followMetaUpdates", func() error {
+ return processEventFn(resp)
+ }, func(err error) bool {
+ glog.Errorf("process %v: %v", resp, err)
+ return true
+ })
+ default:
glog.Errorf("process %v: %v", resp, err)
}
}
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go
index 48c4e35c8..91fd5d830 100644
--- a/weed/s3api/auth_credentials_subscribe.go
+++ b/weed/s3api/auth_credentials_subscribe.go
@@ -33,7 +33,7 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
}
util.RetryForever("followIamChanges", func() error {
- return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, prefix, &lastTsNs, 0, 0, processEventFn, true)
+ return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, prefix, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError)
}, func(err error) bool {
glog.V(0).Infof("iam follow metadata changes: %v", err)
return true