aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-05-30 15:25:21 -0700
committerchrislu <chris.lu@gmail.com>2022-05-30 15:25:21 -0700
commitf214dfb1f5ccfecfedd8a5d3b513508e27388467 (patch)
tree968640cacdcbb43118f7faebcdcfc6482440dad4 /weed/server
parentaece35a64fc779be03e43cd7bef79a99bd011634 (diff)
downloadseaweedfs-f214dfb1f5ccfecfedd8a5d3b513508e27388467.tar.xz
seaweedfs-f214dfb1f5ccfecfedd8a5d3b513508e27388467.zip
stop when in memory log is done
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go10
1 files changed, 8 insertions, 2 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 3688ae047..0540400a3 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
@@ -74,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
break
}
}
+ if isDone {
+ return nil
+ }
time.Sleep(1127 * time.Millisecond)
}
@@ -127,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
@@ -142,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
break
}
}
+ if isDone {
+ return nil
+ }
}
return readInMemoryLogErr