aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filer.go')
-rw-r--r--weed/filer/filer.go17
1 files changed, 16 insertions, 1 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 8ba8f9cfa..a4bd1ed50 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"os"
"sort"
"strings"
@@ -48,6 +49,7 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
+ LockRing *lock_manager.LockRing
}
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
@@ -59,6 +61,7 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
UniqueFilerId: util.RandomInt32(),
+ LockRing: lock_manager.NewLockRing(time.Second * 5),
}
if f.UniqueFilerId < 0 {
f.UniqueFilerId = -f.UniqueFilerId
@@ -110,7 +113,19 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
- f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate)
+ f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ if update.NodeType != cluster.FilerType {
+ return
+ }
+ address := pb.ServerAddress(update.Address)
+
+ if update.IsAdd {
+ f.LockRing.AddServer(address)
+ } else {
+ f.LockRing.RemoveServer(address)
+ }
+ f.MetaAggregator.OnPeerUpdate(update, startFrom)
+ })
for _, peerUpdate := range existingNodes {
f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom)