diff options
Diffstat (limited to 'weed/mq/logstore/merged_read.go')
| -rw-r--r-- | weed/mq/logstore/merged_read.go | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go index 03a47ace4..38164a80f 100644 --- a/weed/mq/logstore/merged_read.go +++ b/weed/mq/logstore/merged_read.go @@ -9,17 +9,19 @@ import ( func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { fromParquetFn := GenParquetReadFunc(filerClient, t, p) readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p) - return mergeReadFuncs(fromParquetFn, readLogDirectFn) + // Reversed order: live logs first (recent), then Parquet files (historical) + // This provides better performance for real-time analytics queries + return mergeReadFuncs(readLogDirectFn, fromParquetFn) } -func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType { - var exhaustedParquet bool +func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType { + var exhaustedLiveLogs bool var lastProcessedPosition log_buffer.MessagePosition return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { - if !exhaustedParquet { - // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) - lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) - // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) + if !exhaustedLiveLogs { + // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC()) + lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) + // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) if isDone { isDone = false } @@ -28,14 +30,14 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun } lastProcessedPosition = lastReadPosition } - exhaustedParquet = true + exhaustedLiveLogs = true if startPosition.Before(lastProcessedPosition.Time) { startPosition = lastProcessedPosition } - // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC()) - lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) + // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) + lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) return } } |
