diff options
| author | chrislu <chris.lu@gmail.com> | 2021-12-30 01:51:52 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2021-12-30 01:51:52 -0800 |
| commit | 34742be0295998c2105a5ee50e3e77ef2397c403 (patch) | |
| tree | c50c5a9e201c7f4df3359ce37b9f385f07dc14fa | |
| parent | fb434318e36ac8e78ab304bfd5421f110c10bdf1 (diff) | |
| download | seaweedfs-34742be0295998c2105a5ee50e3e77ef2397c403.tar.xz seaweedfs-34742be0295998c2105a5ee50e3e77ef2397c403.zip | |
remove duplicated metadata subscription in filer
https://github.com/chrislusf/seaweedfs/issues/2545
| -rw-r--r-- | weed/filer/meta_aggregator.go | 31 |
1 files changed, 22 insertions, 9 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 282668146..f419d514d 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -25,7 +25,7 @@ type MetaAggregator struct { isLeader bool grpcDialOption grpc.DialOption MetaLogBuffer *log_buffer.LogBuffer - peerStatues map[pb.ServerAddress]struct{} + peerStatues map[pb.ServerAddress]int peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex @@ -39,7 +39,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. filer: filer, self: self, grpcDialOption: grpcDialOption, - peerStatues: make(map[pb.ServerAddress]struct{}), + peerStatues: make(map[pb.ServerAddress]int), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -56,27 +56,40 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { address := pb.ServerAddress(update.Address) if update.IsAdd { // every filer should subscribe to a new filer - ma.setActive(address, true) - go ma.subscribeToOneFiler(ma.filer, ma.self, address) + if ma.setActive(address, true) { + go ma.subscribeToOneFiler(ma.filer, ma.self, address) + } } else { ma.setActive(address, false) } } -func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() if isActive { - ma.peerStatues[address] = struct{}{} + if _, found := ma.peerStatues[address]; found { + ma.peerStatues[address] += 1 + } else { + ma.peerStatues[address] = 1 + notDuplicated = true + } } else { - delete(ma.peerStatues, address) + if _, found := ma.peerStatues[address]; found { + ma.peerStatues[address] -= 1 + } + if ma.peerStatues[address] <= 0 { + delete(ma.peerStatues, address) + } } + return } func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { ma.peerStatuesLock.Lock() defer ma.peerStatuesLock.Unlock() - _, isActive = ma.peerStatues[address] - return + var count int + count, isActive = ma.peerStatues[address] + return count > 0 && isActive } func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { |
