aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/logstore/merged_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/logstore/merged_read.go')
-rw-r--r--weed/mq/logstore/merged_read.go22
1 files changed, 12 insertions, 10 deletions
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go
index 03a47ace4..38164a80f 100644
--- a/weed/mq/logstore/merged_read.go
+++ b/weed/mq/logstore/merged_read.go
@@ -9,17 +9,19 @@ import (
func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
fromParquetFn := GenParquetReadFunc(filerClient, t, p)
readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
- return mergeReadFuncs(fromParquetFn, readLogDirectFn)
+ // Reversed order: live logs first (recent), then Parquet files (historical)
+ // This provides better performance for real-time analytics queries
+ return mergeReadFuncs(readLogDirectFn, fromParquetFn)
}
-func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
- var exhaustedParquet bool
+func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
+ var exhaustedLiveLogs bool
var lastProcessedPosition log_buffer.MessagePosition
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- if !exhaustedParquet {
- // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
- lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
- // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
+ if !exhaustedLiveLogs {
+ // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
+ lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
+ // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
if isDone {
isDone = false
}
@@ -28,14 +30,14 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun
}
lastProcessedPosition = lastReadPosition
}
- exhaustedParquet = true
+ exhaustedLiveLogs = true
if startPosition.Before(lastProcessedPosition.Time) {
startPosition = lastProcessedPosition
}
- // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
- lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
+ // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
+ lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
return
}
}