aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-11-06 14:25:06 -0700
committerChris Lu <chris.lu@gmail.com>2021-11-06 14:26:26 -0700
commit751a7073e3bf7e45f75a7638da256720c2d902a7 (patch)
tree67998d568bc5aa69ab0c6aac5be7f109e447beeb /weed/filer/filer.go
parent04663c3611326a8bac87d47fc4a2c546c6d9acdd (diff)
parente0fc2898e9f9ec24ac2a94f7fba4b440058e2d25 (diff)
downloadseaweedfs-751a7073e3bf7e45f75a7638da256720c2d902a7.tar.xz
seaweedfs-751a7073e3bf7e45f75a7638da256720c2d902a7.zip
Merge branch 'flexible_filer_addition'
Diffstat (limited to 'weed/filer/filer.go')
-rw-r--r--weed/filer/filer.go43
1 files changed, 30 insertions, 13 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 76d2f3f47..6dca3321f 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
"strings"
"time"
@@ -66,22 +67,38 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
return f
}
-func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) {
+func (f *Filer) AggregateFromPeers(self pb.ServerAddress) {
- // set peers
- found := false
- for _, peer := range filers {
- if peer == self {
- found = true
- }
- }
- if !found {
- filers = append(filers, self)
+ f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
+ f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
+
+ for _, peerUpdate := range f.ListExistingPeerUpdates() {
+ f.MetaAggregator.OnPeerUpdate(peerUpdate)
}
- f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
- f.MetaAggregator.StartLoopSubscribe(f, self)
+}
+func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){
+
+ if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
+ ClientType: "filer",
+ })
+
+ glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
+ for _, node := range resp.ClusterNodes {
+ existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
+ NodeType: "filer",
+ Address: node.Address,
+ IsLeader: node.IsLeader,
+ IsAdd: true,
+ })
+ }
+ return err
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
+ }
+ return
}
func (f *Filer) SetStore(store FilerStore) {
@@ -117,7 +134,7 @@ func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
}
-func (fs *Filer) KeepConnectedToMaster() {
+func (fs *Filer) KeepMasterClientConnected() {
fs.MasterClient.KeepConnectedToMaster()
}