aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-07-13 22:55:28 -0700
committerChris Lu <chris.lu@gmail.com>2020-07-13 22:55:28 -0700
commit4f6096c7f08d1a2468451684d44675310512524d (patch)
tree43c0a583ce929bd07411ea44d7349883beab65c4 /weed/server/filer_grpc_server_sub_meta.go
parent87b503171445fed690634769d1d1b0ac77dffd9f (diff)
downloadseaweedfs-4f6096c7f08d1a2468451684d44675310512524d.tar.xz
seaweedfs-4f6096c7f08d1a2468451684d44675310512524d.zip
add reading from persisted logs for local filer store
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go15
1 files changed, 15 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 4fd38abe5..4341f2091 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -63,6 +63,20 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+ if _, ok := fs.filer.Store.ActualStore.(filer2.FilerLocalStore); ok {
+ // println("reading from persisted logs ...")
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+ glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ }
+
+ // println("reading from in memory logs ...")
err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
@@ -117,6 +131,7 @@ func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream file
EventNotification: eventNotification,
TsNs: tsNs,
}
+ // println("sending", dirPath, entryName)
if err := stream.Send(message); err != nil {
glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err