aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-05-30 15:20:51 -0700
committerchrislu <chris.lu@gmail.com>2022-05-30 15:20:51 -0700
commitaece35a64fc779be03e43cd7bef79a99bd011634 (patch)
treee12248af36ca4f484198ebb962dc2af87a79a10e /weed/server/filer_grpc_server_sub_meta.go
parenta2b101a737de0a4085f560971f6f25cb8f4e6050 (diff)
downloadseaweedfs-aece35a64fc779be03e43cd7bef79a99bd011634.tar.xz
seaweedfs-aece35a64fc779be03e43cd7bef79a99bd011634.zip
stop when on disk log is done
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go12
1 files changed, 10 insertions, 2 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index bed8580cc..3688ae047 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -39,15 +39,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
+ var isDone bool
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
+ if isDone {
+ return nil
+ }
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -98,15 +102,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
+ var isDone bool
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
+ if isDone {
+ return nil
+ }
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)