aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go12
-rw-r--r--weed/server/filer_server.go8
2 files changed, 11 insertions, 9 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index e7e5b0d48..8ef75cf02 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -37,10 +37,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs)
}
- err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.listenersLock.Lock()
- fs.listenersCond.Wait()
- fs.listenersLock.Unlock()
+ err = fs.metaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.metaAggregator.ListenersLock.Lock()
+ fs.metaAggregator.ListenersCond.Wait()
+ fs.metaAggregator.ListenersLock.Unlock()
return true
}, eachLogEntryFn)
@@ -134,7 +134,3 @@ func (fs *FilerServer) addClient(clientType string, clientAddress string) (clien
func (fs *FilerServer) deleteClient(clientName string) {
glog.V(0).Infof("- listener %v", clientName)
}
-
-func (fs *FilerServer) notifyMetaListeners() {
- fs.listenersCond.Broadcast()
-}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 10b607dfe..6baf8f1b8 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -52,12 +52,14 @@ type FilerOption struct {
Port uint32
recursiveDelete bool
Cipher bool
+ Filers []string
}
type FilerServer struct {
option *FilerOption
secret security.SigningKey
filer *filer2.Filer
+ metaAggregator *filer2.MetaAggregator
grpcDialOption grpc.DialOption
// notifying clients
@@ -81,12 +83,16 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.listenersCond.Broadcast()
+ })
+ fs.metaAggregator = filer2.NewMetaAggregator(append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port)), fs.grpcDialOption)
fs.filer.Cipher = option.Cipher
maybeStartMetrics(fs, option)
go fs.filer.KeepConnectedToMaster()
+ fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano())
v := util.GetViper()
if !util.LoadConfiguration("filer", false) {