aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-27 23:49:46 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-28 00:05:57 -0700
commit5c57297bd1baf6ddfe94db87a426c2bd9e88ab4b (patch)
treecce07973a7d99c77bb336f20df7bf66364f69b30 /weed/server
parent47c4a62c5db8564d036cc96c2f4a097494a2567a (diff)
downloadseaweedfs-5c57297bd1baf6ddfe94db87a426c2bd9e88ab4b.tar.xz
seaweedfs-5c57297bd1baf6ddfe94db87a426c2bd9e88ab4b.zip
metadata log: read from any timestamp
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_listen.go25
1 files changed, 14 insertions, 11 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index 1da257202..2a18f1950 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -21,10 +21,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
defer fs.deleteClient(clientName)
- lastReadTime := time.Now()
- if req.SinceNs > 0 {
- lastReadTime = time.Unix(0, req.SinceNs)
- }
+ lastReadTime := time.Unix(0, req.SinceNs)
eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
@@ -59,12 +56,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
- _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.listenersLock.Lock()
- fs.listenersCond.Wait()
- fs.listenersLock.Unlock()
- return true
- }, func(logEntry *filer_pb.LogEntry) error {
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
@@ -76,7 +68,18 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
return nil
- })
+ }
+
+ if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
return err