diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-10-24 01:26:42 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-24 01:26:42 -0700 |
| commit | 7d147f238c51f3df77da8b13d48b86ede83497ce (patch) | |
| tree | 63b7dce990fe3e31c5f320c2ee86ebbcf2d3faba /weed/server/filer_grpc_server_sub_meta.go | |
| parent | d220875ef4601e74b6ab49e47f8e2dd36510482c (diff) | |
| download | seaweedfs-7d147f238c51f3df77da8b13d48b86ede83497ce.tar.xz seaweedfs-7d147f238c51f3df77da8b13d48b86ede83497ce.zip | |
avoid repeated reading disk (#7369)
* avoid repeated reading disk
* checks both flush time AND read position advancement
* wait on cond
* fix reading
Gap detection and skipping to earliest memory time
Time-based reads that include events at boundary times for first reads (offset ≤ 0)
Aggregated subscriber wake-up via ListenersWaits signaling
* address comments
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 137 |
1 files changed, 103 insertions, 34 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index f4df550e6..29f71edc7 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -69,14 +69,30 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { - nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) - position := log_buffer.NewMessagePosition(nextDayTs, -2) - found, err := fs.filer.HasPersistedLogFiles(position) - if err != nil { - return fmt.Errorf("checking persisted log files: %w", err) - } - if found { - lastReadTime = position + // No data found on disk + // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap + if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { + // We have a gap: requested time < earliest memory time, but no data on disk + // Skip forward to earliest memory time to avoid infinite loop + earliestTime := fs.filer.MetaAggregator.MetaLogBuffer.GetEarliestTime() + if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) { + glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v", + lastReadTime.Time, earliestTime, clientName) + // Position at earliest time; time-based reader will include it + lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) + readInMemoryLogErr = nil // Clear the error since we're skipping forward + } + } else { + // First pass or no ResumeFromDiskError yet - check the next day for logs + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %w", err) + } + if found { + lastReadTime = position + } } } @@ -91,12 +107,16 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } fs.filer.MetaAggregator.ListenersLock.Lock() + atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1) fs.filer.MetaAggregator.ListenersCond.Wait() + atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1) fs.filer.MetaAggregator.ListenersLock.Unlock() return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { + // Memory says data is too old - will read from disk on next iteration + // But if disk also has no data (gap in history), we'll skip forward continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) @@ -150,39 +170,71 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq var readPersistedLogErr error var readInMemoryLogErr error var isDone bool + var lastCheckedFlushTsNs int64 = -1 // Track the last flushed time we checked + var lastDiskReadTsNs int64 = -1 // Track the last read position we used for disk read for { - // println("reading from persisted logs ...") - glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) - if readPersistedLogErr != nil { - glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) - return fmt.Errorf("reading from persisted logs: %w", readPersistedLogErr) - } - if isDone { - return nil - } - - if processedTsNs != 0 { - lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) - } else { - if readInMemoryLogErr == log_buffer.ResumeFromDiskError { - time.Sleep(1127 * time.Millisecond) - continue + // Check if new data has been flushed to disk since last check, or if read position advanced + currentFlushTsNs := fs.filer.LocalMetaLogBuffer.GetLastFlushTsNs() + currentReadTsNs := lastReadTime.Time.UnixNano() + // Read from disk if: first time, new flush observed, or read position advanced (draining backlog) + shouldReadFromDisk := lastCheckedFlushTsNs == -1 || + currentFlushTsNs > lastCheckedFlushTsNs || + currentReadTsNs > lastDiskReadTsNs + + if shouldReadFromDisk { + // Record the position we are about to read from + lastDiskReadTsNs = currentReadTsNs + glog.V(4).Infof("read on disk %v local subscribe %s from %+v (lastFlushed: %v)", clientName, req.PathPrefix, lastReadTime, time.Unix(0, currentFlushTsNs)) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) + if readPersistedLogErr != nil { + glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) + return fmt.Errorf("reading from persisted logs: %w", readPersistedLogErr) } - // If no persisted entries were read for this day, check the next day for logs - nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) - position := log_buffer.NewMessagePosition(nextDayTs, -2) - found, err := fs.filer.HasPersistedLogFiles(position) - if err != nil { - return fmt.Errorf("checking persisted log files: %w", err) + if isDone { + return nil } - if found { - lastReadTime = position + + // Update the last checked flushed time + lastCheckedFlushTsNs = currentFlushTsNs + + if processedTsNs != 0 { + lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) + } else { + // No data found on disk + // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + // We have a gap: requested time < earliest memory time, but no data on disk + // Skip forward to earliest memory time to avoid infinite loop + earliestTime := fs.filer.LocalMetaLogBuffer.GetEarliestTime() + if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) { + glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v", + lastReadTime.Time, earliestTime, clientName) + // Position at earliest time; time-based reader will include it + lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) + readInMemoryLogErr = nil // Clear the error since we're skipping forward + } else { + // No memory data yet, just wait + time.Sleep(1127 * time.Millisecond) + continue + } + } else { + // First pass or no ResumeFromDiskError yet + // Check the next day for logs + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %w", err) + } + if found { + lastReadTime = position + } + } } } - glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { @@ -205,6 +257,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq }, eachLogEntryFn) if readInMemoryLogErr != nil { if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + // Memory buffer says the requested time is too old + // Retry disk read if: (a) flush advanced, or (b) read position advanced (draining backlog) + currentFlushTsNs := fs.filer.LocalMetaLogBuffer.GetLastFlushTsNs() + currentReadTsNs := lastReadTime.Time.UnixNano() + if currentFlushTsNs > lastCheckedFlushTsNs || currentReadTsNs > lastDiskReadTsNs { + glog.V(0).Infof("retry disk read %v local subscribe %s (lastFlushed: %v -> %v, readTs: %v -> %v)", + clientName, req.PathPrefix, + time.Unix(0, lastCheckedFlushTsNs), time.Unix(0, currentFlushTsNs), + time.Unix(0, lastDiskReadTsNs), time.Unix(0, currentReadTsNs)) + continue + } + // No progress possible, wait for new data to arrive (event-driven, not polling) + fs.listenersLock.Lock() + atomic.AddInt64(&fs.listenersWaits, 1) + fs.listenersCond.Wait() + atomic.AddInt64(&fs.listenersWaits, -1) + fs.listenersLock.Unlock() continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) |
