diff options
Diffstat (limited to 'weed/mq/logstore/merged_read.go')
| -rw-r--r-- | weed/mq/logstore/merged_read.go | 41 |
1 files changed, 24 insertions, 17 deletions
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go index 38164a80f..c2e8e3caf 100644 --- a/weed/mq/logstore/merged_read.go +++ b/weed/mq/logstore/merged_read.go @@ -15,29 +15,36 @@ func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic. } func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType { - var exhaustedLiveLogs bool - var lastProcessedPosition log_buffer.MessagePosition + // CRITICAL FIX: Removed stateful closure variables (exhaustedLiveLogs, lastProcessedPosition) + // These caused the function to skip disk reads on subsequent calls, leading to + // Schema Registry timeout when data was flushed after the first read attempt. + // The function must be stateless and check for data on EVERY call. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { - if !exhaustedLiveLogs { - // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC()) - lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) - // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) - if isDone { - isDone = false - } - if err != nil { - return - } - lastProcessedPosition = lastReadPosition + // Always try reading from live logs first (recent data) + lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) + if isDone { + // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST), + // we want to continue to read from in-memory data + isDone = false + } + if err != nil { + return } - exhaustedLiveLogs = true - if startPosition.Before(lastProcessedPosition.Time) { - startPosition = lastProcessedPosition + // If live logs returned data, update startPosition for parquet read + if lastReadPosition.Offset > startPosition.Offset || lastReadPosition.Time.After(startPosition.Time) { + startPosition = lastReadPosition } - // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) + // Then try reading from Parquet files (historical data) lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) + + if isDone { + // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST), + // parquet files won't exist, but we want to continue to in-memory data reading + isDone = false + } + return } } |
