aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/logstore/merged_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/logstore/merged_read.go')
-rw-r--r--weed/mq/logstore/merged_read.go41
1 files changed, 24 insertions, 17 deletions
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go
index 38164a80f..c2e8e3caf 100644
--- a/weed/mq/logstore/merged_read.go
+++ b/weed/mq/logstore/merged_read.go
@@ -15,29 +15,36 @@ func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.
}
func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
- var exhaustedLiveLogs bool
- var lastProcessedPosition log_buffer.MessagePosition
+ // CRITICAL FIX: Removed stateful closure variables (exhaustedLiveLogs, lastProcessedPosition)
+ // These caused the function to skip disk reads on subsequent calls, leading to
+ // Schema Registry timeout when data was flushed after the first read attempt.
+ // The function must be stateless and check for data on EVERY call.
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- if !exhaustedLiveLogs {
- // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
- lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
- // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
- if isDone {
- isDone = false
- }
- if err != nil {
- return
- }
- lastProcessedPosition = lastReadPosition
+ // Always try reading from live logs first (recent data)
+ lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
+ if isDone {
+ // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST),
+ // we want to continue to read from in-memory data
+ isDone = false
+ }
+ if err != nil {
+ return
}
- exhaustedLiveLogs = true
- if startPosition.Before(lastProcessedPosition.Time) {
- startPosition = lastProcessedPosition
+ // If live logs returned data, update startPosition for parquet read
+ if lastReadPosition.Offset > startPosition.Offset || lastReadPosition.Time.After(startPosition.Time) {
+ startPosition = lastReadPosition
}
- // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
+ // Then try reading from Parquet files (historical data)
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
+
+ if isDone {
+ // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST),
+ // parquet files won't exist, but we want to continue to in-memory data reading
+ isDone = false
+ }
+
return
}
}