aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/logstore/merged_read.go
blob: 38164a80fc1ce09a61bd325e157c4e4c7badf824 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package logstore

import (
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
)

func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
	fromParquetFn := GenParquetReadFunc(filerClient, t, p)
	readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
	// Reversed order: live logs first (recent), then Parquet files (historical)
	// This provides better performance for real-time analytics queries
	return mergeReadFuncs(readLogDirectFn, fromParquetFn)
}

func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
	var exhaustedLiveLogs bool
	var lastProcessedPosition log_buffer.MessagePosition
	return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
		if !exhaustedLiveLogs {
			// glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
			lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
			// glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
			if isDone {
				isDone = false
			}
			if err != nil {
				return
			}
			lastProcessedPosition = lastReadPosition
		}
		exhaustedLiveLogs = true

		if startPosition.Before(lastProcessedPosition.Time) {
			startPosition = lastProcessedPosition
		}

		// glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
		lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
		return
	}
}