aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_backup.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-04 16:25:46 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-04 16:25:46 -0700
commit6b743dbbf96f863e70ee80e4b32c0928f594891a (patch)
tree0e3551b5c411ad2131a1b5467318fd973bee71b7 /weed/command/filer_backup.go
parentb9ecf1e3a8685c62ccac80ed0fbc180ed34b48e2 (diff)
downloadseaweedfs-6b743dbbf96f863e70ee80e4b32c0928f594891a.tar.xz
seaweedfs-6b743dbbf96f863e70ee80e4b32c0928f594891a.zip
refactor client subscribe metadata
Diffstat (limited to 'weed/command/filer_backup.go')
-rw-r--r--weed/command/filer_backup.go51
1 files changed, 6 insertions, 45 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index fc4dd8298..2828ccb39 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,16 +1,13 @@
package command
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
"time"
)
@@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "backup_" + dataSink.GetName(),
- PathPrefix: sourcePath,
- SinceNs: startFrom.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
-
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("processEventFn: %v", err)
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
- return fmt.Errorf("setOffset: %v", err)
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_" + dataSink.GetName(),
+ sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+
}