aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_meta_backup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_meta_backup.go')
-rw-r--r--weed/command/filer_meta_backup.go48
1 files changed, 7 insertions, 41 deletions
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 28bd367e7..108e76566 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
- "io"
"reflect"
"time"
@@ -190,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
- tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "meta_backup",
- PathPrefix: *metaBackup.filerDirectory,
- SinceNs: startTime.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
- return err2
- }
- }
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
+ return metaBackup.setOffset(lastTime)
+ })
- }
+ return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
+ *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
- })
- return tailErr
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {