diff options
| author | chrislu <chris.lu@gmail.com> | 2025-10-24 00:49:14 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-10-24 00:49:14 -0700 |
| commit | d220875ef4601e74b6ab49e47f8e2dd36510482c (patch) | |
| tree | a3e89ee024f2c9383b36ff5470bff35238f49f03 /weed/server/filer_grpc_server_sub_meta.go | |
| parent | 64a4ce93580258b8d5537416dfb5d9a1a8f14ee2 (diff) | |
| download | seaweedfs-d220875ef4601e74b6ab49e47f8e2dd36510482c.tar.xz seaweedfs-d220875ef4601e74b6ab49e47f8e2dd36510482c.zip | |
Revert "fix reading"
This reverts commit 64a4ce93580258b8d5537416dfb5d9a1a8f14ee2.
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 78 |
1 files changed, 21 insertions, 57 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 566f24ad7..f4df550e6 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -69,30 +69,14 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { - // No data found on disk - // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap - if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { - // We have a gap: requested time < earliest memory time, but no data on disk - // Skip forward to earliest memory time to avoid infinite loop - earliestTime := fs.filer.MetaAggregator.MetaLogBuffer.GetEarliestTime() - if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) { - glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v", - lastReadTime.Time, earliestTime, clientName) - // Position at earliest time; time-based reader will include it - lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) - readInMemoryLogErr = nil // Clear the error since we're skipping forward - } - } else { - // First pass or no ResumeFromDiskError yet - check the next day for logs - nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) - position := log_buffer.NewMessagePosition(nextDayTs, -2) - found, err := fs.filer.HasPersistedLogFiles(position) - if err != nil { - return fmt.Errorf("checking persisted log files: %w", err) - } - if found { - lastReadTime = position - } + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %w", err) + } + if found { + lastReadTime = position } } @@ -107,16 +91,12 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } fs.filer.MetaAggregator.ListenersLock.Lock() - atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1) fs.filer.MetaAggregator.ListenersCond.Wait() - atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1) fs.filer.MetaAggregator.ListenersLock.Unlock() return fs.hasClient(req.ClientId, req.ClientEpoch) }, eachLogEntryFn) if readInMemoryLogErr != nil { if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { - // Memory says data is too old - will read from disk on next iteration - // But if disk also has no data (gap in history), we'll skip forward continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) @@ -186,39 +166,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { - // No data found on disk - // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap if readInMemoryLogErr == log_buffer.ResumeFromDiskError { - // We have a gap: requested time < earliest memory time, but no data on disk - // Skip forward to earliest memory time to avoid infinite loop - earliestTime := fs.filer.LocalMetaLogBuffer.GetEarliestTime() - if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) { - glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v", - lastReadTime.Time, earliestTime, clientName) - // Position at earliest time; time-based reader will include it - lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2) - readInMemoryLogErr = nil // Clear the error since we're skipping forward - } else { - // No memory data yet, just wait - time.Sleep(1127 * time.Millisecond) - continue - } - } else { - // First pass or no ResumeFromDiskError yet - // Check the next day for logs - nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) - position := log_buffer.NewMessagePosition(nextDayTs, -2) - found, err := fs.filer.HasPersistedLogFiles(position) - if err != nil { - return fmt.Errorf("checking persisted log files: %w", err) - } - if found { - lastReadTime = position - } + time.Sleep(1127 * time.Millisecond) + continue + } + // If no persisted entries were read for this day, check the next day for logs + nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano()) + position := log_buffer.NewMessagePosition(nextDayTs, -2) + found, err := fs.filer.HasPersistedLogFiles(position) + if err != nil { + return fmt.Errorf("checking persisted log files: %w", err) + } + if found { + lastReadTime = position } } - glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { |
