diff options
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 8ba8f9cfa..a4bd1ed50 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "os" "sort" "strings" @@ -48,6 +49,7 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + LockRing *lock_manager.LockRing } func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, @@ -59,6 +61,7 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), UniqueFilerId: util.RandomInt32(), + LockRing: lock_manager.NewLockRing(time.Second * 5), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId @@ -110,7 +113,19 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) - f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate) + f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return + } + address := pb.ServerAddress(update.Address) + + if update.IsAdd { + f.LockRing.AddServer(address) + } else { + f.LockRing.RemoveServer(address) + } + f.MetaAggregator.OnPeerUpdate(update, startFrom) + }) for _, peerUpdate := range existingNodes { f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) |
