aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_backup.go52
-rw-r--r--weed/command/filer_sync.go8
2 files changed, 44 insertions, 16 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 1344dfd2c..380540fd9 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,12 +1,15 @@
package command
import (
+ "errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
"regexp"
"strings"
@@ -14,16 +17,18 @@ import (
)
type FilerBackupOptions struct {
- isActivePassive *bool
- filer *string
- path *string
- excludePaths *string
- excludeFileName *string
- debug *bool
- proxyByFiler *bool
- doDeleteFiles *bool
- timeAgo *time.Duration
- retentionDays *int
+ isActivePassive *bool
+ filer *string
+ path *string
+ excludePaths *string
+ excludeFileName *string
+ debug *bool
+ proxyByFiler *bool
+ doDeleteFiles *bool
+ disableErrorRetry *bool
+ ignore404Error *bool
+ timeAgo *time.Duration
+ retentionDays *int
}
var (
@@ -41,6 +46,8 @@ func init() {
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
+ filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print")
+ filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer")
}
var cmdFilerBackup = &Command{
@@ -130,7 +137,23 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
*backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
- processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
+ var processEventFn func(*filer_pb.SubscribeMetadataResponse) error
+ if *backupOption.ignore404Error {
+ processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
+ processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error {
+ err := processEventFnGenerated(resp)
+ if err == nil {
+ return nil
+ }
+ if errors.Is(err, http.ErrNotFound) {
+ glog.V(0).Infof("got 404 error, ignore it: %s", err.Error())
+ return nil
+ }
+ return err
+ }
+ } else {
+ processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
+ }
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
@@ -154,6 +177,11 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
prefix = prefix + "/"
}
+ eventErrorType := pb.RetryForeverOnError
+ if *backupOption.disableErrorRetry {
+ eventErrorType = pb.TrivialOnError
+ }
+
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "backup_" + dataSink.GetName(),
ClientId: clientId,
@@ -164,7 +192,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
DirectoriesToWatch: nil,
StartTsNs: startFrom.UnixNano(),
StopTsNs: 0,
- EventErrorType: pb.RetryForeverOnError,
+ EventErrorType: eventErrorType,
}
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 90204af4a..8201aa712 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -436,7 +436,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
- return fmt.Errorf("create entry1 : %v", err)
+ return fmt.Errorf("create entry1 : %w", err)
} else {
return nil
}
@@ -462,13 +462,13 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
// not able to find old entry
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
- return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ return fmt.Errorf("delete old entry %v: %w", oldKey, err)
}
}
// create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
- return fmt.Errorf("create entry2 : %v", err)
+ return fmt.Errorf("create entry2 : %w", err)
} else {
return nil
}
@@ -486,7 +486,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
- return fmt.Errorf("create entry3 : %v", err)
+ return fmt.Errorf("create entry3 : %w", err)
} else {
return nil
}