aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filer.go17
-rw-r--r--weed/filer/meta_aggregator.go5
2 files changed, 16 insertions, 6 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 8ba8f9cfa..a4bd1ed50 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"os"
"sort"
"strings"
@@ -48,6 +49,7 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
+ LockRing *lock_manager.LockRing
}
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
@@ -59,6 +61,7 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
UniqueFilerId: util.RandomInt32(),
+ LockRing: lock_manager.NewLockRing(time.Second * 5),
}
if f.UniqueFilerId < 0 {
f.UniqueFilerId = -f.UniqueFilerId
@@ -110,7 +113,19 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {
f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
- f.MasterClient.SetOnPeerUpdateFn(f.MetaAggregator.OnPeerUpdate)
+ f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ if update.NodeType != cluster.FilerType {
+ return
+ }
+ address := pb.ServerAddress(update.Address)
+
+ if update.IsAdd {
+ f.LockRing.AddServer(address)
+ } else {
+ f.LockRing.RemoveServer(address)
+ }
+ f.MetaAggregator.OnPeerUpdate(update, startFrom)
+ })
for _, peerUpdate := range existingNodes {
f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom)
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 50cd75994..0433a63a0 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,7 +3,6 @@ package filer
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
@@ -51,10 +50,6 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
}
func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
- if update.NodeType != cluster.FilerType {
- return
- }
-
address := pb.ServerAddress(update.Address)
if update.IsAdd {
// every filer should subscribe to a new filer