diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filer.go | 43 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 52 |
2 files changed, 74 insertions, 21 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 76d2f3f47..6dca3321f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "os" "strings" "time" @@ -66,22 +67,38 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, return f } -func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) { +func (f *Filer) AggregateFromPeers(self pb.ServerAddress) { - // set peers - found := false - for _, peer := range filers { - if peer == self { - found = true - } - } - if !found { - filers = append(filers, self) + f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) + f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate + + for _, peerUpdate := range f.ListExistingPeerUpdates() { + f.MetaAggregator.OnPeerUpdate(peerUpdate) } - f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(f, self) +} +func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){ + + if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: "filer", + }) + + glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) + for _, node := range resp.ClusterNodes { + existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ + NodeType: "filer", + Address: node.Address, + IsLeader: node.IsLeader, + IsAdd: true, + }) + } + return err + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr) + } + return } func (f *Filer) SetStore(store FilerStore) { @@ -117,7 +134,7 @@ func (fs *Filer) GetMaster() pb.ServerAddress { return fs.MasterClient.GetMaster() } -func (fs *Filer) KeepConnectedToMaster() { +func (fs *Filer) KeepMasterClientConnected() { fs.MasterClient.KeepConnectedToMaster() } diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index eba2a044a..6e42b1902 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" "sync" @@ -18,9 +19,13 @@ import ( ) type MetaAggregator struct { - filers []pb.ServerAddress - grpcDialOption grpc.DialOption - MetaLogBuffer *log_buffer.LogBuffer + filer *Filer + self pb.ServerAddress + isLeader bool + grpcDialOption grpc.DialOption + MetaLogBuffer *log_buffer.LogBuffer + peerStatues map[pb.ServerAddress]struct{} + peerStatuesLock sync.Mutex // notifying clients ListenersLock sync.Mutex ListenersCond *sync.Cond @@ -28,10 +33,12 @@ type MetaAggregator struct { // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. // The old data comes from what each LocalMetadata persisted on disk. -func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator { +func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator { t := &MetaAggregator{ - filers: filers, + filer: filer, + self: self, grpcDialOption: grpcDialOption, + peerStatues: make(map[pb.ServerAddress]struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { @@ -40,10 +47,35 @@ func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption return t } -func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self pb.ServerAddress) { - for _, filer := range ma.filers { - go ma.subscribeToOneFiler(f, self, filer) +func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { + if update.NodeType != "filer" { + return } + + address := pb.ServerAddress(update.Address) + if update.IsAdd { + // every filer should subscribe to a new filer + ma.setActive(address, true) + go ma.subscribeToOneFiler(ma.filer, ma.self, address) + } else { + ma.setActive(address, false) + } +} + +func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) { + ma.peerStatuesLock.Lock() + defer ma.peerStatuesLock.Unlock() + if isActive { + ma.peerStatues[address] = struct{}{} + } else { + delete(ma.peerStatues, address) + } +} +func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) { + ma.peerStatuesLock.Lock() + defer ma.peerStatuesLock.Unlock() + _, isActive = ma.peerStatues[address] + return } func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) { @@ -149,6 +181,10 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p } }) + if !ma.isActive(peer) { + glog.V(0).Infof("stop subscribing remote %s meta change", peer) + return + } if err != nil { glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) time.Sleep(1733 * time.Millisecond) |
