aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-10-24 00:49:14 -0700
committerchrislu <chris.lu@gmail.com>2025-10-24 00:49:14 -0700
commitd220875ef4601e74b6ab49e47f8e2dd36510482c (patch)
treea3e89ee024f2c9383b36ff5470bff35238f49f03 /weed/server/filer_grpc_server_sub_meta.go
parent64a4ce93580258b8d5537416dfb5d9a1a8f14ee2 (diff)
downloadseaweedfs-d220875ef4601e74b6ab49e47f8e2dd36510482c.tar.xz
seaweedfs-d220875ef4601e74b6ab49e47f8e2dd36510482c.zip
Revert "fix reading"
This reverts commit 64a4ce93580258b8d5537416dfb5d9a1a8f14ee2.
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go78
1 files changed, 21 insertions, 57 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 566f24ad7..f4df550e6 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -69,30 +69,14 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
- // No data found on disk
- // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap
- if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
- // We have a gap: requested time < earliest memory time, but no data on disk
- // Skip forward to earliest memory time to avoid infinite loop
- earliestTime := fs.filer.MetaAggregator.MetaLogBuffer.GetEarliestTime()
- if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) {
- glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v",
- lastReadTime.Time, earliestTime, clientName)
- // Position at earliest time; time-based reader will include it
- lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2)
- readInMemoryLogErr = nil // Clear the error since we're skipping forward
- }
- } else {
- // First pass or no ResumeFromDiskError yet - check the next day for logs
- nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
- position := log_buffer.NewMessagePosition(nextDayTs, -2)
- found, err := fs.filer.HasPersistedLogFiles(position)
- if err != nil {
- return fmt.Errorf("checking persisted log files: %w", err)
- }
- if found {
- lastReadTime = position
- }
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
+ position := log_buffer.NewMessagePosition(nextDayTs, -2)
+ found, err := fs.filer.HasPersistedLogFiles(position)
+ if err != nil {
+ return fmt.Errorf("checking persisted log files: %w", err)
+ }
+ if found {
+ lastReadTime = position
}
}
@@ -107,16 +91,12 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
fs.filer.MetaAggregator.ListenersLock.Lock()
- atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, 1)
fs.filer.MetaAggregator.ListenersCond.Wait()
- atomic.AddInt64(&fs.filer.MetaAggregator.ListenersWaits, -1)
fs.filer.MetaAggregator.ListenersLock.Unlock()
return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
- // Memory says data is too old - will read from disk on next iteration
- // But if disk also has no data (gap in history), we'll skip forward
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
@@ -186,39 +166,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
- // No data found on disk
- // Check if we previously got ResumeFromDiskError from memory, meaning we're in a gap
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
- // We have a gap: requested time < earliest memory time, but no data on disk
- // Skip forward to earliest memory time to avoid infinite loop
- earliestTime := fs.filer.LocalMetaLogBuffer.GetEarliestTime()
- if !earliestTime.IsZero() && earliestTime.After(lastReadTime.Time) {
- glog.V(3).Infof("gap detected: skipping from %v to earliest memory time %v for %v",
- lastReadTime.Time, earliestTime, clientName)
- // Position at earliest time; time-based reader will include it
- lastReadTime = log_buffer.NewMessagePosition(earliestTime.UnixNano(), -2)
- readInMemoryLogErr = nil // Clear the error since we're skipping forward
- } else {
- // No memory data yet, just wait
- time.Sleep(1127 * time.Millisecond)
- continue
- }
- } else {
- // First pass or no ResumeFromDiskError yet
- // Check the next day for logs
- nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
- position := log_buffer.NewMessagePosition(nextDayTs, -2)
- found, err := fs.filer.HasPersistedLogFiles(position)
- if err != nil {
- return fmt.Errorf("checking persisted log files: %w", err)
- }
- if found {
- lastReadTime = position
- }
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
+ // If no persisted entries were read for this day, check the next day for logs
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.Time.UnixNano())
+ position := log_buffer.NewMessagePosition(nextDayTs, -2)
+ found, err := fs.filer.HasPersistedLogFiles(position)
+ if err != nil {
+ return fmt.Errorf("checking persisted log files: %w", err)
+ }
+ if found {
+ lastReadTime = position
}
}
- glog.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {