diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_listen.go | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index 1da257202..2a18f1950 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -21,10 +21,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, defer fs.deleteClient(clientName) - lastReadTime := time.Now() - if req.SinceNs > 0 { - lastReadTime = time.Unix(0, req.SinceNs) - } + lastReadTime := time.Unix(0, req.SinceNs) eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { @@ -59,12 +56,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } - _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, func(logEntry *filer_pb.LogEntry) error { + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { event := &filer_pb.SubscribeMetadataResponse{} if err := proto.Unmarshal(logEntry.Data, event); err != nil { glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) @@ -76,7 +68,18 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } return nil - }) + } + + if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) return err |
