diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 8 |
2 files changed, 11 insertions, 9 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index e7e5b0d48..8ef75cf02 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -37,10 +37,10 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() + err = fs.metaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.metaAggregator.ListenersLock.Lock() + fs.metaAggregator.ListenersCond.Wait() + fs.metaAggregator.ListenersLock.Unlock() return true }, eachLogEntryFn) @@ -134,7 +134,3 @@ func (fs *FilerServer) addClient(clientType string, clientAddress string) (clien func (fs *FilerServer) deleteClient(clientName string) { glog.V(0).Infof("- listener %v", clientName) } - -func (fs *FilerServer) notifyMetaListeners() { - fs.listenersCond.Broadcast() -} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 10b607dfe..6baf8f1b8 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -52,12 +52,14 @@ type FilerOption struct { Port uint32 recursiveDelete bool Cipher bool + Filers []string } type FilerServer struct { option *FilerOption secret security.SigningKey filer *filer2.Filer + metaAggregator *filer2.MetaAggregator grpcDialOption grpc.DialOption // notifying clients @@ -81,12 +83,16 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.listenersCond.Broadcast() + }) + fs.metaAggregator = filer2.NewMetaAggregator(append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port)), fs.grpcDialOption) fs.filer.Cipher = option.Cipher maybeStartMetrics(fs, option) go fs.filer.KeepConnectedToMaster() + fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano()) v := util.GetViper() if !util.LoadConfiguration("filer", false) { |
