aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-10-24 01:26:42 -0700
committerGitHub <noreply@github.com>2025-10-24 01:26:42 -0700
commit7d147f238c51f3df77da8b13d48b86ede83497ce (patch)
tree63b7dce990fe3e31c5f320c2ee86ebbcf2d3faba /weed/server/filer_grpc_server_sub_meta.go
parentd220875ef4601e74b6ab49e47f8e2dd36510482c (diff)
downloadseaweedfs-7d147f238c51f3df77da8b13d48b86ede83497ce.tar.xz
seaweedfs-7d147f238c51f3df77da8b13d48b86ede83497ce.zip
avoid repeated reading disk (#7369)
* avoid repeated reading disk * checks both flush time AND read position advancement * wait on cond * fix reading Gap detection and skipping to earliest memory time Time-based reads that include events at boundary times for first reads (offset ≤ 0) Aggregated subscriber wake-up via ListenersWaits signaling * address comments
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go137
1 files changed, 103 insertions, 34 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index f4df550e6..29f71edc7 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -69,14 +69,30 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
- nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
- position := log_buffer.NewMessagePosition(nextDayTs, -2)
- found, err := fs.filer.HasPersistedLogFiles(position)
- if err != nil {
- return fmt.Errorf("checking persisted log files: %w", err)
- }
- if found {
- lastReadTime = position
+ // No data found on disk
+ // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap
+ if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
+ // We have a gap: requested time < earliest memory time, but no data on disk
+ // Skip forward to earliest memory time to avoid infinite loop
+ earliestTime := fs.filer.MetaAggregator.MetaLogBuffer.GetEarliestTime()
+ if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) {
+ glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v",
+ lastReadTime.Time, earliestTime, clientName)
+ // Position at earliest time; time-based reader will include it
+ lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2)
+ readInMemoryLogErr = nil // Clear the error since we're skipping forward
+ }
+ } else {
+ // First pass or no ResumeFromDiskError yet - check the next day for logs
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
+ position := log_buffer.NewMessagePosition(nextDayTs, -2)
+ found, err := fs.filer.HasPersistedLogFiles(position)
+ if err != nil {
+ return fmt.Errorf("checking persisted log files: %w", err)
+ }
+ if found {
+ lastReadTime = position
+ }
}
}
@@ -91,12 +107,16 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
fs.filer.MetaAggregator.ListenersLock.Lock()
+ atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1)
fs.filer.MetaAggregator.ListenersCond.Wait()
+ atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1)
fs.filer.MetaAggregator.ListenersLock.Unlock()
return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
+ // Memory says data is too old - will read from disk on next iteration
+ // But if disk also has no data (gap in history), we'll skip forward
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
@@ -150,39 +170,71 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
var readPersistedLogErr error
var readInMemoryLogErr error
var isDone bool
+ var lastCheckedFlushTsNs int64 = -1 // Track the last flushed time we checked
+ var lastDiskReadTsNs int64 = -1 // Track the last read position we used for disk read
for {
- // println("reading from persisted logs ...")
- glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
- if readPersistedLogErr != nil {
- glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
- return fmt.Errorf("reading from persisted logs: %w", readPersistedLogErr)
- }
- if isDone {
- return nil
- }
-
- if processedTsNs != 0 {
- lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
- } else {
- if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
- time.Sleep(1127 * time.Millisecond)
- continue
+ // Check if new data has been flushed to disk since last check, or if read position advanced
+ currentFlushTsNs := fs.filer.LocalMetaLogBuffer.GetLastFlushTsNs()
+ currentReadTsNs := lastReadTime.Time.UnixNano()
+ // Read from disk if: first time, new flush observed, or read position advanced (draining backlog)
+ shouldReadFromDisk := lastCheckedFlushTsNs == -1 ||
+ currentFlushTsNs > lastCheckedFlushTsNs ||
+ currentReadTsNs > lastDiskReadTsNs
+
+ if shouldReadFromDisk {
+ // Record the position we are about to read from
+ lastDiskReadTsNs = currentReadTsNs
+ glog.V(4).Infof("read on disk %v local subscribe %s from %+v (lastFlushed: %v)", clientName, req.PathPrefix, lastReadTime, time.Unix(0, currentFlushTsNs))
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
+ return fmt.Errorf("reading from persisted logs: %w", readPersistedLogErr)
}
- // If no persisted entries were read for this day, check the next day for logs
- nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
- position := log_buffer.NewMessagePosition(nextDayTs, -2)
- found, err := fs.filer.HasPersistedLogFiles(position)
- if err != nil {
- return fmt.Errorf("checking persisted log files: %w", err)
+ if isDone {
+ return nil
}
- if found {
- lastReadTime = position
+
+ // Update the last checked flushed time
+ lastCheckedFlushTsNs = currentFlushTsNs
+
+ if processedTsNs != 0 {
+ lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
+ } else {
+ // No data found on disk
+ // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ // We have a gap: requested time < earliest memory time, but no data on disk
+ // Skip forward to earliest memory time to avoid infinite loop
+ earliestTime := fs.filer.LocalMetaLogBuffer.GetEarliestTime()
+ if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) {
+ glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v",
+ lastReadTime.Time, earliestTime, clientName)
+ // Position at earliest time; time-based reader will include it
+ lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2)
+ readInMemoryLogErr = nil // Clear the error since we're skipping forward
+ } else {
+ // No memory data yet, just wait
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
+ } else {
+ // First pass or no ResumeFromDiskError yet
+ // Check the next day for logs
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
+ position := log_buffer.NewMessagePosition(nextDayTs, -2)
+ found, err := fs.filer.HasPersistedLogFiles(position)
+ if err != nil {
+ return fmt.Errorf("checking persisted log files: %w", err)
+ }
+ if found {
+ lastReadTime = position
+ }
+ }
}
}
- glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
@@ -205,6 +257,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ // Memory buffer says the requested time is too old
+ // Retry disk read if: (a) flush advanced, or (b) read position advanced (draining backlog)
+ currentFlushTsNs := fs.filer.LocalMetaLogBuffer.GetLastFlushTsNs()
+ currentReadTsNs := lastReadTime.Time.UnixNano()
+ if currentFlushTsNs > lastCheckedFlushTsNs || currentReadTsNs > lastDiskReadTsNs {
+ glog.V(0).Infof("retry disk read %v local subscribe %s (lastFlushed: %v -> %v, readTs: %v -> %v)",
+ clientName, req.PathPrefix,
+ time.Unix(0, lastCheckedFlushTsNs), time.Unix(0, currentFlushTsNs),
+ time.Unix(0, lastDiskReadTsNs), time.Unix(0, currentReadTsNs))
+ continue
+ }
+ // No progress possible, wait for new data to arrive (event-driven, not polling)
+ fs.listenersLock.Lock()
+ atomic.AddInt64(&fs.listenersWaits, 1)
+ fs.listenersCond.Wait()
+ atomic.AddInt64(&fs.listenersWaits, -1)
+ fs.listenersLock.Unlock()
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)