aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-12 09:18:54 -0700
committerchrislu <chris.lu@gmail.com>2024-03-12 09:18:54 -0700
commit604091a4807a56b7a56caeeb316c0e5a7642e57d (patch)
tree63538639f8e605cd854650508025e5db1eb516f5
parent1efa502dbbb24f54921b38c23b6bc9359c017977 (diff)
downloadseaweedfs-604091a4807a56b7a56caeeb316c0e5a7642e57d.tar.xz
seaweedfs-604091a4807a56b7a56caeeb316c0e5a7642e57d.zip
use stopChan to close previous filer peer meta subscription instances
-rw-r--r--weed/filer/meta_aggregator.go67
1 files changed, 28 insertions, 39 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 0433a63a0..e18e69216 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -21,13 +21,13 @@ import (
)
type MetaAggregator struct {
- filer *Filer
- self pb.ServerAddress
- isLeader bool
- grpcDialOption grpc.DialOption
- MetaLogBuffer *log_buffer.LogBuffer
- peerStatues map[pb.ServerAddress]int
- peerStatuesLock sync.Mutex
+ filer *Filer
+ self pb.ServerAddress
+ isLeader bool
+ grpcDialOption grpc.DialOption
+ MetaLogBuffer *log_buffer.LogBuffer
+ peerChans map[pb.ServerAddress]chan struct{}
+ peerChansLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
@@ -40,7 +40,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
filer: filer,
self: self,
grpcDialOption: grpcDialOption,
- peerStatues: make(map[pb.ServerAddress]int),
+ peerChans: make(map[pb.ServerAddress]chan struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@@ -50,51 +50,40 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
}
func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ ma.peerChansLock.Lock()
+ defer ma.peerChansLock.Unlock()
+
address := pb.ServerAddress(update.Address)
if update.IsAdd {
- // every filer should subscribe to a new filer
- if ma.setActive(address, true) {
- go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom)
- }
- } else {
- ma.setActive(address, false)
- }
-}
-
-func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
- ma.peerStatuesLock.Lock()
- defer ma.peerStatuesLock.Unlock()
- if isActive {
- if _, found := ma.peerStatues[address]; found {
- ma.peerStatues[address] += 1
- } else {
- ma.peerStatues[address] = 1
- notDuplicated = true
+ // cancel previous subscription if any
+ if prevChan, found := ma.peerChans[address]; found {
+ close(prevChan)
}
+ stopChan := make(chan struct{})
+ ma.peerChans[address] = stopChan
+ go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom, stopChan)
} else {
- if _, found := ma.peerStatues[address]; found {
- delete(ma.peerStatues, address)
+ if prevChan, found := ma.peerChans[address]; found {
+ close(prevChan)
+ delete(ma.peerChans, address)
}
}
- return
-}
-func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
- ma.peerStatuesLock.Lock()
- defer ma.peerStatuesLock.Unlock()
- var count int
- count, isActive = ma.peerStatues[address]
- return count > 0 && isActive
}
-func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) {
+func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
lastTsNs := startFrom.UnixNano()
for {
glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
- if !ma.isActive(peer) {
- glog.V(0).Infof("stop subscribing remote %s meta change", peer)
+
+ // check stopChan to see if we should stop
+ select {
+ case <-stopChan:
+ glog.V(0).Infof("stop subscribing peer %s meta change", peer)
return
+ default:
}
+
if err != nil {
errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {