aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_listen.go25
1 files changed, 14 insertions, 11 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index 1da257202..2a18f1950 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -21,10 +21,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
defer fs.deleteClient(clientName)
- lastReadTime := time.Now()
- if req.SinceNs > 0 {
- lastReadTime = time.Unix(0, req.SinceNs)
- }
+ lastReadTime := time.Unix(0, req.SinceNs)
eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
@@ -59,12 +56,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
- _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.listenersLock.Lock()
- fs.listenersCond.Wait()
- fs.listenersLock.Unlock()
- return true
- }, func(logEntry *filer_pb.LogEntry) error {
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
@@ -76,7 +68,18 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
return nil
- })
+ }
+
+ if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
return err