aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-08-08 18:55:53 -0700
committerGitHub <noreply@github.com>2021-08-08 18:55:53 -0700
commit52fcce81c64348dccf706e52161cf3fcf71e1df8 (patch)
treeeff9e3a111e227de7803c4ab5bd03ea79897bb57 /weed/command/filer_sync.go
parent4370a4db634f2268526911842a804d9dee97aadc (diff)
parent882a93dacd2ca8549af50b80a8a98a861fdee0ed (diff)
downloadseaweedfs-52fcce81c64348dccf706e52161cf3fcf71e1df8.tar.xz
seaweedfs-52fcce81c64348dccf706e52161cf3fcf71e1df8.zip
Merge pull request #2241 from chrislusf/add_remote_storage
WIP: remote storage
Diffstat (limited to 'weed/command/filer_sync.go')
-rw-r--r--weed/command/filer_sync.go49
1 files changed, 6 insertions, 43 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 7cfc8a7fe..a20f17201 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
- "io"
"strings"
"time"
)
@@ -166,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "syncTo_" + targetFiler,
- PathPrefix: sourcePath,
- SinceNs: sourceFilerOffsetTsNs,
- Signature: targetFilerSignature,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
- return err
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_" + targetFiler,
+ sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+
}
const (