diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filer.go | 17 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 5 |
2 files changed, 16 insertions, 6 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 8ba8f9cfa..a4bd1ed50 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "os" "sort" "strings" @@ -48,6 +49,7 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + LockRing *lock_manager.LockRing } func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, @@ -59,6 +61,7 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), UniqueFilerId: util.RandomInt32(), + LockRing: lock_manager.NewLockRing(time.Second * 5), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId @@ -110,7 +113,19 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) - f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate) + f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return + } + address := pb.ServerAddress(update.Address) + + if update.IsAdd { + f.LockRing.AddServer(address) + } else { + f.LockRing.RemoveServer(address) + } + f.MetaAggregator.OnPeerUpdate(update, startFrom) + }) for _, peerUpdate := range existingNodes { f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 50cd75994..0433a63a0 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,7 +3,6 @@ package filer import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" "io" @@ -51,10 +50,6 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. } func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { - if update.NodeType != cluster.FilerType { - return - } - address := pb.ServerAddress(update.Address) if update.IsAdd { // every filer should subscribe to a new filer |
