diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-08 00:03:08 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-08 00:03:08 -0800 |
| commit | 49428a303b1fe791954a97df0f5d911c4b7c0f3f (patch) | |
| tree | 2c87d112c4f399f62fa336d1a99da8d0ecf3d1f6 /weed/server/filer_grpc_server_sub_meta.go | |
| parent | d0d24f1e40d6630124d82c6c7f17fc42049c58aa (diff) | |
| download | seaweedfs-49428a303b1fe791954a97df0f5d911c4b7c0f3f.tar.xz seaweedfs-49428a303b1fe791954a97df0f5d911c4b7c0f3f.zip | |
add batch index for each memory buffer
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index eb69a6aeb..466851614 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -32,7 +32,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } defer fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) - lastReadTime := time.Unix(0, req.SinceNs) + lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) @@ -57,12 +57,12 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) + lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, false, req.UntilNs, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() @@ -113,7 +113,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch) }() - lastReadTime := time.Unix(0, req.SinceNs) + lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId) eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) @@ -138,7 +138,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq } if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) + lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) @@ -148,7 +148,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, false, req.UntilNs, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() |
