diff options
Diffstat (limited to 'weed/filer/meta_aggregator.go')
| -rw-r--r-- | weed/filer/meta_aggregator.go | 52 |
1 files changed, 44 insertions, 8 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index eba2a044a..6e42b1902 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" "sync" @@ -18,9 +19,13 @@ import ( ) type MetaAggregator struct { - filers []pb.ServerAddress - grpcDialOption grpc.DialOption - MetaLogBuffer *log_buffer.LogBuffer + filer *Filer + self pb.ServerAddress + isLeader bool + grpcDialOption grpc.DialOption + MetaLogBuffer *log_buffer.LogBuffer + peerStatues map[pb.ServerAddress]struct{} + peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond @@ -28,10 +33,12 @@ type MetaAggregator struct { // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. // The old data comes from what each LocalMetadata persisted on disk. -func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator { +func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator { t := &MetaAggregator{ - filers: filers, + filer: filer, + self: self, grpcDialOption: grpcDialOption, + peerStatues: make(map[pb.ServerAddress]struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -40,10 +47,35 @@ func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption return t } -func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self pb.ServerAddress) { - for _, filer := range ma.filers { - go ma.subscribeToOneFiler(f, self, filer) +func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { + if update.NodeType != "filer" { + return } + + address := pb.ServerAddress(update.Address) + if update.IsAdd { + // every filer should subscribe to a new filer + ma.setActive(address, true) + go ma.subscribeToOneFiler(ma.filer, ma.self, address) + } else { + ma.setActive(address, false) + } +} + +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { + ma.peerStatuesLock.Lock() + defer ma.peerStatuesLock.Unlock() + if isActive { + ma.peerStatues[address] = struct{}{} + } else { + delete(ma.peerStatues, address) + } +} +func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) { + ma.peerStatuesLock.Lock() + defer ma.peerStatuesLock.Unlock() + _, isActive = ma.peerStatues[address] + return } func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { @@ -149,6 +181,10 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p } }) + if !ma.isActive(peer) { + glog.V(0).Infof("stop subscribing remote %s meta change", peer) + return + } if err != nil { glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) time.Sleep(1733 * time.Millisecond) |
