diff options
| author | chrislu <chris.lu@gmail.com> | 2023-06-25 00:58:21 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-06-25 00:58:21 -0700 |
| commit | 3fd659df2a35c42d6a30ce251413f219ea24d1c2 (patch) | |
| tree | 4fe33062833c3340d618e5c0ea2e7968f446baf5 /weed/filer/filer.go | |
| parent | 051501414928baeef4a7bba7f62b08f96bcb75f7 (diff) | |
| download | seaweedfs-3fd659df2a35c42d6a30ce251413f219ea24d1c2.tar.xz seaweedfs-3fd659df2a35c42d6a30ce251413f219ea24d1c2.zip | |
add distributed lock manager
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 8ba8f9cfa..a4bd1ed50 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "os" "sort" "strings" @@ -48,6 +49,7 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + LockRing *lock_manager.LockRing } func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, @@ -59,6 +61,7 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), UniqueFilerId: util.RandomInt32(), + LockRing: lock_manager.NewLockRing(time.Second * 5), } if f.UniqueFilerId < 0 { f.UniqueFilerId = -f.UniqueFilerId @@ -110,7 +113,19 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) - f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate) + f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.FilerType { + return + } + address := pb.ServerAddress(update.Address) + + if update.IsAdd { + f.LockRing.AddServer(address) + } else { + f.LockRing.RemoveServer(address) + } + f.MetaAggregator.OnPeerUpdate(update, startFrom) + }) for _, peerUpdate := range existingNodes { f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) |
