diff options
| author | chrislu <chris.lu@gmail.com> | 2022-03-26 12:33:45 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-03-26 12:33:45 -0700 |
| commit | 34b743c481e65b25c052512603d1794eddfe839c (patch) | |
| tree | 454523f3ac306ab883f69f92ef894ee0f459a08c /weed | |
| parent | fba1cfc2d6ef741164047e3c1af73843ee6bc2b7 (diff) | |
| download | seaweedfs-34b743c481e65b25c052512603d1794eddfe839c.tar.xz seaweedfs-34b743c481e65b25c052512603d1794eddfe839c.zip | |
Revert "remove duplicated metadata subscription in filer"
This reverts commit 34742be0295998c2105a5ee50e3e77ef2397c403.
Related to https://github.com/chrislusf/seaweedfs/issues/2545
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/filer/meta_aggregator.go | 31 |
1 files changed, 9 insertions, 22 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 1e8b89ad5..13c2239f0 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -25,7 +25,7 @@ type MetaAggregator struct { isLeader bool grpcDialOption grpc.DialOption MetaLogBuffer *log_buffer.LogBuffer - peerStatues map[pb.ServerAddress]int + peerStatues map[pb.ServerAddress]struct{} peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex @@ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. filer: filer, self: self, grpcDialOption: grpcDialOption, - peerStatues: make(map[pb.ServerAddress]int), + peerStatues: make(map[pb.ServerAddress]struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -56,40 +56,27 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { address := pb.ServerAddress(update.Address) if update.IsAdd { // every filer should subscribe to a new filer - if ma.setActive(address, true) { - go ma.subscribeToOneFiler(ma.filer, ma.self, address) - } + ma.setActive(address, true) + go ma.subscribeToOneFiler(ma.filer, ma.self, address) } else { ma.setActive(address, false) } } -func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() if isActive { - if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] += 1 - } else { - ma.peerStatues[address] = 1 - notDuplicated = true - } + ma.peerStatues[address] = struct{}{} } else { - if _, found := ma.peerStatues[address]; found { - ma.peerStatues[address] -= 1 - } - if ma.peerStatues[address] <= 0 { - delete(ma.peerStatues, address) - } + delete(ma.peerStatues, address) } - return } func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() - var count int - count, isActive = ma.peerStatues[address] - return count > 0 && isActive + _, isActive = ma.peerStatues[address] + return } func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { |
