diff options
| author | chrislu <chris.lu@gmail.com> | 2022-05-30 15:20:51 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-05-30 15:20:51 -0700 |
| commit | aece35a64fc779be03e43cd7bef79a99bd011634 (patch) | |
| tree | e12248af36ca4f484198ebb962dc2af87a79a10e /weed/server/filer_grpc_server_sub_meta.go | |
| parent | a2b101a737de0a4085f560971f6f25cb8f4e6050 (diff) | |
| download | seaweedfs-aece35a64fc779be03e43cd7bef79a99bd011634.tar.xz seaweedfs-aece35a64fc779be03e43cd7bef79a99bd011634.zip | |
stop when on disk log is done
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index bed8580cc..3688ae047 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -39,15 +39,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, var processedTsNs int64 var readPersistedLogErr error var readInMemoryLogErr error + var isDone bool for { glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } + if isDone { + return nil + } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) @@ -98,15 +102,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq var processedTsNs int64 var readPersistedLogErr error var readInMemoryLogErr error + var isDone bool for { // println("reading from persisted logs ...") glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } + if isDone { + return nil + } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) |
