diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-27 23:49:46 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-28 00:05:57 -0700 |
| commit | 5c57297bd1baf6ddfe94db87a426c2bd9e88ab4b (patch) | |
| tree | cce07973a7d99c77bb336f20df7bf66364f69b30 /weed/server | |
| parent | 47c4a62c5db8564d036cc96c2f4a097494a2567a (diff) | |
| download | seaweedfs-5c57297bd1baf6ddfe94db87a426c2bd9e88ab4b.tar.xz seaweedfs-5c57297bd1baf6ddfe94db87a426c2bd9e88ab4b.zip | |
metadata log: read from any timestamp
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_listen.go | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go index 1da257202..2a18f1950 100644 --- a/weed/server/filer_grpc_server_listen.go +++ b/weed/server/filer_grpc_server_listen.go @@ -21,10 +21,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, defer fs.deleteClient(clientName) - lastReadTime := time.Now() - if req.SinceNs > 0 { - lastReadTime = time.Unix(0, req.SinceNs) - } + lastReadTime := time.Unix(0, req.SinceNs) eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { @@ -59,12 +56,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } - _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, func(logEntry *filer_pb.LogEntry) error { + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { event := &filer_pb.SubscribeMetadataResponse{} if err := proto.Unmarshal(logEntry.Data, event); err != nil { glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) @@ -76,7 +68,18 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } return nil - }) + } + + if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) return err |
