diff options
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 43 |
1 files changed, 30 insertions, 13 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 76d2f3f47..6dca3321f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "os" "strings" "time" @@ -66,22 +67,38 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, return f } -func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) { +func (f *Filer) AggregateFromPeers(self pb.ServerAddress) { - // set peers - found := false - for _, peer := range filers { - if peer == self { - found = true - } - } - if !found { - filers = append(filers, self) + f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) + f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate + + for _, peerUpdate := range f.ListExistingPeerUpdates() { + f.MetaAggregator.OnPeerUpdate(peerUpdate) } - f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(f, self) +} +func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){ + + if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: "filer", + }) + + glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) + for _, node := range resp.ClusterNodes { + existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ + NodeType: "filer", + Address: node.Address, + IsLeader: node.IsLeader, + IsAdd: true, + }) + } + return err + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr) + } + return } func (f *Filer) SetStore(store FilerStore) { @@ -117,7 +134,7 @@ func (fs *Filer) GetMaster() pb.ServerAddress { return fs.MasterClient.GetMaster() } -func (fs *Filer) KeepConnectedToMaster() { +func (fs *Filer) KeepMasterClientConnected() { fs.MasterClient.KeepConnectedToMaster() } |
