aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-03-26 12:33:45 -0700
committerchrislu <chris.lu@gmail.com>2022-03-26 12:33:45 -0700
commit34b743c481e65b25c052512603d1794eddfe839c (patch)
tree454523f3ac306ab883f69f92ef894ee0f459a08c /weed
parentfba1cfc2d6ef741164047e3c1af73843ee6bc2b7 (diff)
downloadseaweedfs-34b743c481e65b25c052512603d1794eddfe839c.tar.xz
seaweedfs-34b743c481e65b25c052512603d1794eddfe839c.zip
Revert "remove duplicated metadata subscription in filer"
This reverts commit 34742be0295998c2105a5ee50e3e77ef2397c403. Related to https://github.com/chrislusf/seaweedfs/issues/2545
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/meta_aggregator.go31
1 files changed, 9 insertions, 22 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 1e8b89ad5..13c2239f0 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -25,7 +25,7 @@ type MetaAggregator struct {
isLeader bool
grpcDialOption grpc.DialOption
MetaLogBuffer *log_buffer.LogBuffer
- peerStatues map[pb.ServerAddress]int
+ peerStatues map[pb.ServerAddress]struct{}
peerStatuesLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
@@ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
filer: filer,
self: self,
grpcDialOption: grpcDialOption,
- peerStatues: make(map[pb.ServerAddress]int),
+ peerStatues: make(map[pb.ServerAddress]struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@@ -56,40 +56,27 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
address := pb.ServerAddress(update.Address)
if update.IsAdd {
// every filer should subscribe to a new filer
- if ma.setActive(address, true) {
- go ma.subscribeToOneFiler(ma.filer, ma.self, address)
- }
+ ma.setActive(address, true)
+ go ma.subscribeToOneFiler(ma.filer, ma.self, address)
} else {
ma.setActive(address, false)
}
}
-func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
+func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
if isActive {
- if _, found := ma.peerStatues[address]; found {
- ma.peerStatues[address] += 1
- } else {
- ma.peerStatues[address] = 1
- notDuplicated = true
- }
+ ma.peerStatues[address] = struct{}{}
} else {
- if _, found := ma.peerStatues[address]; found {
- ma.peerStatues[address] -= 1
- }
- if ma.peerStatues[address] <= 0 {
- delete(ma.peerStatues, address)
- }
+ delete(ma.peerStatues, address)
}
- return
}
func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
- var count int
- count, isActive = ma.peerStatues[address]
- return count > 0 && isActive
+ _, isActive = ma.peerStatues[address]
+ return
}
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {