aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Denushev <mdenushev@ya.ru>2024-11-14 19:40:55 +0300
committerGitHub <noreply@github.com>2024-11-14 08:40:55 -0800
commita5fe6e21bce464017fde036160e986d4c3bb81db (patch)
tree4eee03e2651ab5cea1ad42d159e5220a43b6da2a
parent3003c9e17e06db92f5a426c191ebaab15a1764a1 (diff)
downloadseaweedfs-a5fe6e21bce464017fde036160e986d4c3bb81db.tar.xz
seaweedfs-a5fe6e21bce464017fde036160e986d4c3bb81db.zip
feat(filer.backup): add ignore errors option (#6235)
* feat(filer.backup): add ignore errors option * feat(filer.backup): fix 404 error wrap * feat(filer.backup): fix wrapping function * feat(filer.backup): fix wrapping errors in genProcessFunction * Update weed/command/filer_backup.go * Update weed/command/filer_backup.go * Update weed/command/filer_backup.go --------- Co-authored-by: Max Denushev <denushev@tochka.com> Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
-rw-r--r--weed/command/filer_backup.go52
-rw-r--r--weed/command/filer_sync.go8
-rw-r--r--weed/util/http/http_global_client_util.go11
3 files changed, 52 insertions, 19 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 1344dfd2c..380540fd9 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,12 +1,15 @@
package command
import (
+ "errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
"regexp"
"strings"
@@ -14,16 +17,18 @@ import (
)
type FilerBackupOptions struct {
- isActivePassive *bool
- filer *string
- path *string
- excludePaths *string
- excludeFileName *string
- debug *bool
- proxyByFiler *bool
- doDeleteFiles *bool
- timeAgo *time.Duration
- retentionDays *int
+ isActivePassive *bool
+ filer *string
+ path *string
+ excludePaths *string
+ excludeFileName *string
+ debug *bool
+ proxyByFiler *bool
+ doDeleteFiles *bool
+ disableErrorRetry *bool
+ ignore404Error *bool
+ timeAgo *time.Duration
+ retentionDays *int
}
var (
@@ -41,6 +46,8 @@ func init() {
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
+ filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print")
+ filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer")
}
var cmdFilerBackup = &Command{
@@ -130,7 +137,23 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
*backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
- processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
+ var processEventFn func(*filer_pb.SubscribeMetadataResponse) error
+ if *backupOption.ignore404Error {
+ processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
+ processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error {
+ err := processEventFnGenerated(resp)
+ if err == nil {
+ return nil
+ }
+ if errors.Is(err, http.ErrNotFound) {
+ glog.V(0).Infof("got 404 error, ignore it: %s", err.Error())
+ return nil
+ }
+ return err
+ }
+ } else {
+ processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
+ }
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
@@ -154,6 +177,11 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
prefix = prefix + "/"
}
+ eventErrorType := pb.RetryForeverOnError
+ if *backupOption.disableErrorRetry {
+ eventErrorType = pb.TrivialOnError
+ }
+
metadataFollowOption := &pb.MetadataFollowOption{
ClientName: "backup_" + dataSink.GetName(),
ClientId: clientId,
@@ -164,7 +192,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
DirectoriesToWatch: nil,
StartTsNs: startFrom.UnixNano(),
StopTsNs: 0,
- EventErrorType: pb.RetryForeverOnError,
+ EventErrorType: eventErrorType,
}
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 90204af4a..8201aa712 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -436,7 +436,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
- return fmt.Errorf("create entry1 : %v", err)
+ return fmt.Errorf("create entry1 : %w", err)
} else {
return nil
}
@@ -462,13 +462,13 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
// not able to find old entry
if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
- return fmt.Errorf("delete old entry %v: %v", oldKey, err)
+ return fmt.Errorf("delete old entry %v: %w", oldKey, err)
}
}
// create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
- return fmt.Errorf("create entry2 : %v", err)
+ return fmt.Errorf("create entry2 : %w", err)
} else {
return nil
}
@@ -486,7 +486,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
- return fmt.Errorf("create entry3 : %v", err)
+ return fmt.Errorf("create entry3 : %w", err)
} else {
return nil
}
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go
index c3931a790..33d978d9e 100644
--- a/weed/util/http/http_global_client_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -5,8 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"net/http"
"net/url"
@@ -16,6 +16,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
)
+var ErrNotFound = fmt.Errorf("not found")
+
func Post(url string, values url.Values) ([]byte, error) {
r, err := GetGlobalHttpClient().PostForm(url, values)
if err != nil {
@@ -311,7 +313,10 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
}
defer CloseResponse(r)
if r.StatusCode >= 400 {
- retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499
+ if r.StatusCode == http.StatusNotFound {
+ return true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
+ }
+ retryable = r.StatusCode >= 499
return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
@@ -477,4 +482,4 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
return n, err
-} \ No newline at end of file
+}