aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-09 11:21:23 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-09 11:21:23 -0700
commit387ab6796f274151f802ccdab8756b959b5fb1cb (patch)
treea3b95f5bdba66f12c609b5e53b262b011a47a450 /weed/filer
parent4fc0bd1a8173e284ff919edb5214f5adf7a90f06 (diff)
downloadseaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.tar.xz
seaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.zip
filer: cross cluster synchronization
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunks.go5
-rw-r--r--weed/filer/filer_delete_entry.go8
-rw-r--r--weed/filer/filer_notify.go11
3 files changed, 19 insertions, 5 deletions
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index c45963193..db55eec00 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -226,6 +226,11 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu
sort.Slice(chunks, func(i, j int) bool {
if chunks[i].Mtime == chunks[j].Mtime {
+ filer_pb.EnsureFid(chunks[i])
+ filer_pb.EnsureFid(chunks[j])
+ if chunks[i].Fid == nil || chunks[j].Fid == nil {
+ return true
+ }
return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
}
return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index e2198bd21..379156321 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -27,7 +27,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster)
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures)
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
@@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return nil
}
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) {
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, err error) {
lastFileName := ""
includeLastFile := false
@@ -73,7 +73,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName = sub.Name()
var dirChunks []*filer_pb.FileChunk
if sub.IsDirectory() {
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil)
chunks = append(chunks, dirChunks...)
} else {
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
@@ -95,7 +95,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
- f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil)
+ f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
return chunks, nil
}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index e00117382..8719cf5b5 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -30,6 +30,15 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
if strings.HasPrefix(fullpath, SystemLogDir) {
return
}
+ foundSelf := false
+ for _, sig := range signatures {
+ if sig == f.Signature {
+ foundSelf = true
+ }
+ }
+ if !foundSelf {
+ signatures = append(signatures, f.Signature)
+ }
newParentPath := ""
if newEntry != nil {
@@ -41,7 +50,7 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
DeleteChunks: deleteChunks,
NewParentPath: newParentPath,
IsFromOtherCluster: isFromOtherCluster,
- Signatures: append(signatures, f.Signature),
+ Signatures: signatures,
}
if notification.Queue != nil {