aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-28 23:48:48 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-28 23:48:48 -0700
commitca658a97c5248ba099356b006f0b341af53b0816 (patch)
tree8b59defed9a417c4fa2e9346a23cd8a64e851852 /weed/replication
parent63ad1abccec691d2204b8dc63109ffeead0b0eed (diff)
downloadseaweedfs-ca658a97c5248ba099356b006f0b341af53b0816.tar.xz
seaweedfs-ca658a97c5248ba099356b006f0b341af53b0816.zip
add signatures to messages to avoid double processing
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/sink/filersink/filer_sink.go6
1 files changed, 5 insertions, 1 deletions
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 6429859b4..b90a642c9 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -25,6 +25,7 @@ type FilerSink struct {
ttlSec int32
dataCenter string
grpcDialOption grpc.DialOption
+ signature int32
}
func init() {
@@ -61,6 +62,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string,
fs.collection = collection
fs.ttlSec = int32(ttlSec)
fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+ fs.signature = util.RandomInt32()
return nil
}
@@ -69,7 +71,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo
dir, name := util.FullPath(key).DirAndName()
glog.V(1).Infof("delete entry: %v", key)
- err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true)
+ err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true, fs.signature)
if err != nil {
glog.V(0).Infof("delete entry %s: %v", key, err)
return fmt.Errorf("delete entry %s: %v", key, err)
@@ -114,6 +116,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error {
Chunks: replicatedChunks,
},
IsFromOtherCluster: true,
+ Signatures: []int32{fs.signature},
}
glog.V(1).Infof("create: %v", request)
@@ -193,6 +196,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
Directory: newParentPath,
Entry: existingEntry,
IsFromOtherCluster: true,
+ Signatures: []int32{fs.signature},
}
if _, err := client.UpdateEntry(context.Background(), request); err != nil {