aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_backup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_backup.go')
-rw-r--r--weed/command/filer_backup.go55
1 files changed, 8 insertions, 47 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 888b46fe7..0c450181b 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,16 +1,13 @@
package command
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
"time"
)
@@ -52,11 +49,11 @@ var cmdFilerBackup = &Command{
func runFilerBackup(cmd *Command, args []string) bool {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
-
util.LoadConfiguration("security", false)
util.LoadConfiguration("replication", true)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
for {
err := doFilerBackup(grpcDialOption, &filerBackupOptions)
if err != nil {
@@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "backup_" + dataSink.GetName(),
- PathPrefix: sourcePath,
- SinceNs: startFrom.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
-
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("processEventFn: %v", err)
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
- return fmt.Errorf("setOffset: %v", err)
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
+ sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+
}