aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filer.go43
-rw-r--r--weed/filer/meta_aggregator.go52
2 files changed, 74 insertions, 21 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 76d2f3f47..6dca3321f 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
"strings"
"time"
@@ -66,22 +67,38 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
return f
}
-func (f *Filer) AggregateFromPeers(self pb.ServerAddress, filers []pb.ServerAddress) {
+func (f *Filer) AggregateFromPeers(self pb.ServerAddress) {
- // set peers
- found := false
- for _, peer := range filers {
- if peer == self {
- found = true
- }
- }
- if !found {
- filers = append(filers, self)
+ f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
+ f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
+
+ for _, peerUpdate := range f.ListExistingPeerUpdates() {
+ f.MetaAggregator.OnPeerUpdate(peerUpdate)
}
- f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
- f.MetaAggregator.StartLoopSubscribe(f, self)
+}
+func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){
+
+ if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
+ ClientType: "filer",
+ })
+
+ glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
+ for _, node := range resp.ClusterNodes {
+ existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
+ NodeType: "filer",
+ Address: node.Address,
+ IsLeader: node.IsLeader,
+ IsAdd: true,
+ })
+ }
+ return err
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
+ }
+ return
}
func (f *Filer) SetStore(store FilerStore) {
@@ -117,7 +134,7 @@ func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
}
-func (fs *Filer) KeepConnectedToMaster() {
+func (fs *Filer) KeepMasterClientConnected() {
fs.MasterClient.KeepConnectedToMaster()
}
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index eba2a044a..6e42b1902 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"sync"
@@ -18,9 +19,13 @@ import (
)
type MetaAggregator struct {
- filers []pb.ServerAddress
- grpcDialOption grpc.DialOption
- MetaLogBuffer *log_buffer.LogBuffer
+ filer *Filer
+ self pb.ServerAddress
+ isLeader bool
+ grpcDialOption grpc.DialOption
+ MetaLogBuffer *log_buffer.LogBuffer
+ peerStatues map[pb.ServerAddress]struct{}
+ peerStatuesLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
@@ -28,10 +33,12 @@ type MetaAggregator struct {
// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
// The old data comes from what each LocalMetadata persisted on disk.
-func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
+func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
t := &MetaAggregator{
- filers: filers,
+ filer: filer,
+ self: self,
grpcDialOption: grpcDialOption,
+ peerStatues: make(map[pb.ServerAddress]struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@@ -40,10 +47,35 @@ func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption
return t
}
-func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self pb.ServerAddress) {
- for _, filer := range ma.filers {
- go ma.subscribeToOneFiler(f, self, filer)
+func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
+ if update.NodeType != "filer" {
+ return
}
+
+ address := pb.ServerAddress(update.Address)
+ if update.IsAdd {
+ // every filer should subscribe to a new filer
+ ma.setActive(address, true)
+ go ma.subscribeToOneFiler(ma.filer, ma.self, address)
+ } else {
+ ma.setActive(address, false)
+ }
+}
+
+func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
+ ma.peerStatuesLock.Lock()
+ defer ma.peerStatuesLock.Unlock()
+ if isActive {
+ ma.peerStatues[address] = struct{}{}
+ } else {
+ delete(ma.peerStatues, address)
+ }
+}
+func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) {
+ ma.peerStatuesLock.Lock()
+ defer ma.peerStatuesLock.Unlock()
+ _, isActive = ma.peerStatues[address]
+ return
}
func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
@@ -149,6 +181,10 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
}
})
+ if !ma.isActive(peer) {
+ glog.V(0).Infof("stop subscribing remote %s meta change", peer)
+ return
+ }
if err != nil {
glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
time.Sleep(1733 * time.Millisecond)