diff options
| author | chrislu <chris.lu@gmail.com> | 2022-05-30 15:25:21 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-05-30 15:25:21 -0700 |
| commit | f214dfb1f5ccfecfedd8a5d3b513508e27388467 (patch) | |
| tree | 968640cacdcbb43118f7faebcdcfc6482440dad4 /weed/server | |
| parent | aece35a64fc779be03e43cd7bef79a99bd011634 (diff) | |
| download | seaweedfs-f214dfb1f5ccfecfedd8a5d3b513508e27388467.tar.xz seaweedfs-f214dfb1f5ccfecfedd8a5d3b513508e27388467.zip | |
stop when in memory log is done
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 3688ae047..0540400a3 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() @@ -74,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, break } } + if isDone { + return nil + } time.Sleep(1127 * time.Millisecond) } @@ -127,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() @@ -142,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq break } } + if isDone { + return nil + } } return readInMemoryLogErr |
