diff options
Diffstat (limited to 'weed/command/filer_meta_backup.go')
| -rw-r--r-- | weed/command/filer_meta_backup.go | 48 |
1 files changed, 7 insertions, 41 deletions
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 28bd367e7..108e76566 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/spf13/viper" "google.golang.org/grpc" - "io" "reflect" "time" @@ -190,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return nil } - tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "meta_backup", - PathPrefix: *metaBackup.filerDirectory, - SinceNs: startTime.UnixNano(), - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - if err = eachEntryFunc(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil { - return err2 - } - } + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3)) + return metaBackup.setOffset(lastTime) + }) - } + return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", + *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) - }) - return tailErr } func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) { |
