aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_sync.go')
-rw-r--r--weed/command/filer_sync.go19
1 files changed, 11 insertions, 8 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 38d29cdc6..33dcf4073 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -251,7 +251,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
- persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, debug)
+ persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, true, debug)
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
@@ -369,7 +369,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
-func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
+func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
// process function
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
@@ -397,16 +397,19 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) {
return nil
}
+ if dataSink.IsIncremental() {
+ doDeleteFiles = false
+ }
// handle deletions
if filer_pb.IsDelete(resp) {
+ if doDeleteFiles {
+ return nil
+ }
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
- if !dataSink.IsIncremental() {
- return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
- }
- return nil
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
// handle new entries
@@ -432,7 +435,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
// old key is in the watched directory
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is also in the watched directory
- if !dataSink.IsIncremental() {
+ if doDeleteFiles {
oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
@@ -455,7 +458,7 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
} else {
// new key is outside of the watched directory
- if !dataSink.IsIncremental() {
+ if doDeleteFiles {
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}