diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-08-28 23:48:48 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-08-28 23:48:48 -0700 |
| commit | ca658a97c5248ba099356b006f0b341af53b0816 (patch) | |
| tree | 8b59defed9a417c4fa2e9346a23cd8a64e851852 /weed/filer2/filer.go | |
| parent | 63ad1abccec691d2204b8dc63109ffeead0b0eed (diff) | |
| download | seaweedfs-ca658a97c5248ba099356b006f0b341af53b0816.tar.xz seaweedfs-ca658a97c5248ba099356b006f0b341af53b0816.zip | |
add signatures to messages to avoid double processing
Diffstat (limited to 'weed/filer2/filer.go')
| -rw-r--r-- | weed/filer2/filer.go | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index d3dfa5a6f..d8929f88f 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -36,6 +36,7 @@ type Filer struct { metaLogCollection string metaLogReplication string MetaAggregator *MetaAggregator + Signature int32 } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -44,6 +45,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, + Signature: util.RandomInt32(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) f.metaLogCollection = collection @@ -93,7 +95,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error { return f.Store.RollbackTransaction(ctx) } -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { if string(entry.FullPath) == "/" { return nil @@ -143,7 +145,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } } else { f.maybeAddBucket(dirEntry) - f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) } } else if !dirEntry.IsDirectory() { @@ -191,7 +193,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } f.maybeAddBucket(entry) - f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) f.deleteChunksIfNotNew(oldEntry, entry) |
