aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/logstore
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/logstore')
-rw-r--r--weed/mq/logstore/log_to_parquet.go114
-rw-r--r--weed/mq/logstore/merged_read.go41
-rw-r--r--weed/mq/logstore/read_log_from_disk.go235
-rw-r--r--weed/mq/logstore/read_parquet_to_log.go32
4 files changed, 362 insertions, 60 deletions
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go
index 8855d68f9..bfd5ff10e 100644
--- a/weed/mq/logstore/log_to_parquet.go
+++ b/weed/mq/logstore/log_to_parquet.go
@@ -13,6 +13,7 @@ import (
"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -25,8 +26,10 @@ import (
)
const (
- SW_COLUMN_NAME_TS = "_ts_ns"
- SW_COLUMN_NAME_KEY = "_key"
+ SW_COLUMN_NAME_TS = "_ts_ns"
+ SW_COLUMN_NAME_KEY = "_key"
+ SW_COLUMN_NAME_OFFSET = "_offset"
+ SW_COLUMN_NAME_VALUE = "_value"
)
func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
@@ -185,7 +188,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string)
}
// read min ts
- minTsBytes := entry.Extended["min"]
+ minTsBytes := entry.Extended[mq.ExtendedAttrTimestampMin]
if len(minTsBytes) != 8 {
return nil
}
@@ -195,7 +198,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string)
}
// read max ts
- maxTsBytes := entry.Extended["max"]
+ maxTsBytes := entry.Extended[mq.ExtendedAttrTimestampMax]
if len(maxTsBytes) != 8 {
return nil
}
@@ -208,6 +211,36 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string)
return
}
+// isSchemalessRecordType checks if the recordType represents a schema-less topic
+// Schema-less topics only have system fields: _ts_ns, _key, and _value
+func isSchemalessRecordType(recordType *schema_pb.RecordType) bool {
+ if recordType == nil {
+ return false
+ }
+
+ // Count only non-system data fields (exclude _ts_ns and _key which are always present)
+ // Schema-less topics should only have _value as the data field
+ hasValue := false
+ dataFieldCount := 0
+
+ for _, field := range recordType.Fields {
+ switch field.Name {
+ case SW_COLUMN_NAME_TS, SW_COLUMN_NAME_KEY, SW_COLUMN_NAME_OFFSET:
+ // System fields - ignore
+ continue
+ case SW_COLUMN_NAME_VALUE:
+ hasValue = true
+ dataFieldCount++
+ default:
+ // Any other field means it's not schema-less
+ dataFieldCount++
+ }
+ }
+
+ // Schema-less = only has _value field as the data field (plus system fields)
+ return hasValue && dataFieldCount == 1
+}
+
func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
tempFile, err := os.CreateTemp(".", "t*.parquet")
@@ -227,6 +260,9 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
rowBuilder := parquet.NewRowBuilder(parquetSchema)
var startTsNs, stopTsNs int64
+ var minOffset, maxOffset int64
+ var hasOffsets bool
+ isSchemaless := isSchemalessRecordType(recordType)
for _, logFile := range logFileGroups {
var rows []parquet.Row
@@ -242,19 +278,56 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
stopTsNs = entry.TsNs
+ // Track offset ranges for Kafka integration
+ if entry.Offset > 0 {
+ if !hasOffsets {
+ minOffset = entry.Offset
+ maxOffset = entry.Offset
+ hasOffsets = true
+ } else {
+ if entry.Offset < minOffset {
+ minOffset = entry.Offset
+ }
+ if entry.Offset > maxOffset {
+ maxOffset = entry.Offset
+ }
+ }
+ }
+
// write to parquet file
rowBuilder.Reset()
record := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(entry.Data, record); err != nil {
- return fmt.Errorf("unmarshal record value: %w", err)
- }
- // Initialize Fields map if nil (prevents nil map assignment panic)
- if record.Fields == nil {
+ if isSchemaless {
+ // For schema-less topics, put raw entry.Data into _value field
record.Fields = make(map[string]*schema_pb.Value)
+ record.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{
+ BytesValue: entry.Data,
+ },
+ }
+ } else {
+ // For schematized topics, unmarshal entry.Data as RecordValue
+ if err := proto.Unmarshal(entry.Data, record); err != nil {
+ return fmt.Errorf("unmarshal record value: %w", err)
+ }
+
+ // Initialize Fields map if nil (prevents nil map assignment panic)
+ if record.Fields == nil {
+ record.Fields = make(map[string]*schema_pb.Value)
+ }
+
+ // Add offset field to parquet records for native offset support
+ // ASSUMPTION: LogEntry.Offset field is populated by broker during message publishing
+ record.Fields[SW_COLUMN_NAME_OFFSET] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{
+ Int64Value: entry.Offset,
+ },
+ }
}
+ // Add system columns (for both schematized and schema-less topics)
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{
Int64Value: entry.TsNs,
@@ -323,7 +396,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
}
- if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil {
+ if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart, minOffset, maxOffset, hasOffsets); err != nil {
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
}
@@ -331,7 +404,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
-func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error {
+func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64, minOffset, maxOffset int64, hasOffsets bool) error {
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("new uploader: %w", err)
@@ -359,22 +432,33 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
entry.Extended = make(map[string][]byte)
minTsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
- entry.Extended["min"] = minTsBytes
+ entry.Extended[mq.ExtendedAttrTimestampMin] = minTsBytes
maxTsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
- entry.Extended["max"] = maxTsBytes
+ entry.Extended[mq.ExtendedAttrTimestampMax] = maxTsBytes
+
+ // Add offset range metadata for Kafka integration (same as regular log files)
+ if hasOffsets && minOffset > 0 && maxOffset >= minOffset {
+ minOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes
+
+ maxOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes
+ }
// Store source log files for deduplication (JSON-encoded list)
if len(sourceLogFiles) > 0 {
sourceLogFilesJson, _ := json.Marshal(sourceLogFiles)
- entry.Extended["sources"] = sourceLogFilesJson
+ entry.Extended[mq.ExtendedAttrSources] = sourceLogFilesJson
}
// Store earliest buffer_start for precise broker deduplication
if earliestBufferStart > 0 {
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart))
- entry.Extended["buffer_start"] = bufferStartBytes
+ entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes
}
for i := int64(0); i < chunkCount; i++ {
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go
index 38164a80f..c2e8e3caf 100644
--- a/weed/mq/logstore/merged_read.go
+++ b/weed/mq/logstore/merged_read.go
@@ -15,29 +15,36 @@ func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.
}
func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
- var exhaustedLiveLogs bool
- var lastProcessedPosition log_buffer.MessagePosition
+ // CRITICAL FIX: Removed stateful closure variables (exhaustedLiveLogs, lastProcessedPosition)
+ // These caused the function to skip disk reads on subsequent calls, leading to
+ // Schema Registry timeout when data was flushed after the first read attempt.
+ // The function must be stateless and check for data on EVERY call.
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- if !exhaustedLiveLogs {
- // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC())
- lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
- // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
- if isDone {
- isDone = false
- }
- if err != nil {
- return
- }
- lastProcessedPosition = lastReadPosition
+ // Always try reading from live logs first (recent data)
+ lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
+ if isDone {
+ // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST),
+ // we want to continue to read from in-memory data
+ isDone = false
+ }
+ if err != nil {
+ return
}
- exhaustedLiveLogs = true
- if startPosition.Before(lastProcessedPosition.Time) {
- startPosition = lastProcessedPosition
+ // If live logs returned data, update startPosition for parquet read
+ if lastReadPosition.Offset > startPosition.Offset || lastReadPosition.Time.After(startPosition.Time) {
+ startPosition = lastReadPosition
}
- // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
+ // Then try reading from Parquet files (historical data)
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
+
+ if isDone {
+ // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST),
+ // parquet files won't exist, but we want to continue to in-memory data reading
+ isDone = false
+ }
+
return
}
}
diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go
index 61c231461..86c8b40cc 100644
--- a/weed/mq/logstore/read_log_from_disk.go
+++ b/weed/mq/logstore/read_log_from_disk.go
@@ -2,6 +2,7 @@ package logstore
import (
"context"
+ "encoding/binary"
"fmt"
"math"
"strings"
@@ -20,9 +21,15 @@ import (
func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
partitionDir := topic.PartitionDir(t, p)
+ // Create a small cache for recently-read file chunks (3 files, 60s TTL)
+ // This significantly reduces Filer load when multiple consumers are catching up
+ fileCache := log_buffer.NewDiskBufferCache(3, 60*time.Second)
+
lookupFileIdFn := filer.LookupFn(filerClient)
- eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
+ eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64, startOffset int64, isOffsetBased bool) (processedTsNs int64, err error) {
+ entriesSkipped := 0
+ entriesProcessed := 0
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
@@ -38,13 +45,24 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %w", err)
return
}
- if logEntry.TsNs <= starTsNs {
- pos += 4 + int(size)
- continue
- }
- if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
- println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
- return
+
+ // Filter by offset if this is an offset-based subscription
+ if isOffsetBased {
+ if logEntry.Offset < startOffset {
+ entriesSkipped++
+ pos += 4 + int(size)
+ continue
+ }
+ } else {
+ // Filter by timestamp for timestamp-based subscriptions
+ if logEntry.TsNs <= starTsNs {
+ pos += 4 + int(size)
+ continue
+ }
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
+ return
+ }
}
// fmt.Printf(" read logEntry: %v, ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
@@ -54,6 +72,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
}
processedTsNs = logEntry.TsNs
+ entriesProcessed++
pos += 4 + int(size)
@@ -62,7 +81,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
return
}
- eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
+ eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64, startOffset int64, isOffsetBased bool) (processedTsNs int64, err error) {
if len(entry.Content) > 0 {
// skip .offset files
return
@@ -78,28 +97,58 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
}
urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil {
+ glog.V(1).Infof("lookup %s failed: %v", chunk.FileId, err)
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
return
}
if len(urlStrings) == 0 {
+ glog.V(1).Infof("no url found for %s", chunk.FileId)
err = fmt.Errorf("no url found for %s", chunk.FileId)
return
}
+ glog.V(2).Infof("lookup %s returned %d URLs", chunk.FileId, len(urlStrings))
- // try one of the urlString until util.Get(urlString) succeeds
+ // Try to get data from cache first
+ cacheKey := fmt.Sprintf("%s/%s/%d/%s", t.Name, p.String(), p.RangeStart, chunk.FileId)
+ if cachedData, _, found := fileCache.Get(cacheKey); found {
+ if cachedData == nil {
+ // Negative cache hit - data doesn't exist
+ continue
+ }
+ // Positive cache hit - data exists
+ if processedTsNs, err = eachChunkFn(cachedData, eachLogEntryFn, starTsNs, stopTsNs, startOffset, isOffsetBased); err != nil {
+ glog.V(1).Infof("eachChunkFn failed on cached data: %v", err)
+ return
+ }
+ continue
+ }
+
+ // Cache miss - try one of the urlString until util.Get(urlString) succeeds
var processed bool
for _, urlString := range urlStrings {
// TODO optimization opportunity: reuse the buffer
var data []byte
+ glog.V(2).Infof("trying to fetch data from %s", urlString)
if data, _, err = util_http.Get(urlString); err == nil {
+ glog.V(2).Infof("successfully fetched %d bytes from %s", len(data), urlString)
processed = true
- if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
+
+ // Store in cache for future reads
+ fileCache.Put(cacheKey, data, startOffset)
+
+ if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs, startOffset, isOffsetBased); err != nil {
+ glog.V(1).Infof("eachChunkFn failed: %v", err)
return
}
break
+ } else {
+ glog.V(2).Infof("failed to fetch from %s: %v", urlString, err)
}
}
if !processed {
+ // Store negative cache entry - data doesn't exist or all URLs failed
+ fileCache.Put(cacheKey, nil, startOffset)
+ glog.V(1).Infof("no data processed for %s %s - all URLs failed", entry.Name, chunk.FileId)
err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
return
}
@@ -109,37 +158,183 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
}
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
+ startFileName := startPosition.Time.UTC().Format(topic.TIME_FORMAT)
startTsNs := startPosition.Time.UnixNano()
stopTime := time.Unix(0, stopTsNs)
var processedTsNs int64
+
+ // Check if this is an offset-based subscription
+ isOffsetBased := startPosition.IsOffsetBased
+ var startOffset int64
+ if isOffsetBased {
+ startOffset = startPosition.Offset
+ // CRITICAL FIX: For offset-based reads, ignore startFileName (which is based on Time)
+ // and list all files from the beginning to find the right offset
+ startFileName = ""
+ glog.V(1).Infof("disk read start: topic=%s partition=%s startOffset=%d",
+ t.Name, p, startOffset)
+ }
+
+ // OPTIMIZATION: For offset-based reads, collect all files with their offset ranges first
+ // Then use binary search to find the right file, and skip files that don't contain the offset
+ var candidateFiles []*filer_pb.Entry
+ var foundStartFile bool
+
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // First pass: collect all relevant files with their metadata
+ glog.V(2).Infof("listing directory %s for offset %d startFileName=%q", partitionDir, startOffset, startFileName)
return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+
if entry.IsDirectory {
return nil
}
if strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
- // FIXME: this is a hack to skip the .offset files
if strings.HasSuffix(entry.Name, ".offset") {
return nil
}
if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
- isDone = true
- return nil
- }
- if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
return nil
}
- if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
- return err
+
+ // OPTIMIZATION: For offset-based reads, check if this file contains the requested offset
+ if isOffsetBased {
+ glog.V(3).Infof("found file %s", entry.Name)
+ // Check if file has offset range metadata
+ if minOffsetBytes, hasMin := entry.Extended["offset_min"]; hasMin && len(minOffsetBytes) == 8 {
+ if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 {
+ fileMinOffset := int64(binary.BigEndian.Uint64(minOffsetBytes))
+ fileMaxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes))
+
+ // Skip files that don't contain our offset range
+ if startOffset > fileMaxOffset {
+ return nil
+ }
+
+ // If we haven't found the start file yet, check if this file contains it
+ if !foundStartFile && startOffset >= fileMinOffset && startOffset <= fileMaxOffset {
+ foundStartFile = true
+ }
+ }
+ }
+ // If file doesn't have offset metadata, include it (might be old format)
+ } else {
+ // Timestamp-based filtering
+ topicName := t.Name
+ if dotIndex := strings.LastIndex(topicName, "."); dotIndex != -1 {
+ topicName = topicName[dotIndex+1:]
+ }
+ isSystemTopic := strings.HasPrefix(topicName, "_")
+ if !isSystemTopic && startPosition.Time.Unix() > 86400 && entry.Name < startPosition.Time.UTC().Format(topic.TIME_FORMAT) {
+ return nil
+ }
}
+
+ // Add file to candidates for processing
+ candidateFiles = append(candidateFiles, entry)
+ glog.V(3).Infof("added candidate file %s (total=%d)", entry.Name, len(candidateFiles))
return nil
}, startFileName, true, math.MaxInt32)
})
- lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
+
+ if err != nil {
+ glog.Errorf("failed to list directory %s: %v", partitionDir, err)
+ return
+ }
+
+ glog.V(2).Infof("found %d candidate files for topic=%s partition=%s offset=%d",
+ len(candidateFiles), t.Name, p, startOffset)
+
+ if len(candidateFiles) == 0 {
+ glog.V(2).Infof("no files found in %s", partitionDir)
+ return startPosition, isDone, nil
+ }
+
+ // OPTIMIZATION: For offset-based reads with many files, use binary search to find start file
+ if isOffsetBased && len(candidateFiles) > 10 {
+ // Binary search to find the first file that might contain our offset
+ left, right := 0, len(candidateFiles)-1
+ startIdx := 0
+
+ for left <= right {
+ mid := (left + right) / 2
+ entry := candidateFiles[mid]
+
+ if minOffsetBytes, hasMin := entry.Extended["offset_min"]; hasMin && len(minOffsetBytes) == 8 {
+ if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 {
+ fileMinOffset := int64(binary.BigEndian.Uint64(minOffsetBytes))
+ fileMaxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes))
+
+ if startOffset < fileMinOffset {
+ // Our offset is before this file, search left
+ right = mid - 1
+ } else if startOffset > fileMaxOffset {
+ // Our offset is after this file, search right
+ left = mid + 1
+ startIdx = left
+ } else {
+ // Found the file containing our offset
+ startIdx = mid
+ break
+ }
+ } else {
+ break
+ }
+ } else {
+ break
+ }
+ }
+
+ // Process files starting from the found index
+ candidateFiles = candidateFiles[startIdx:]
+ }
+
+ // Second pass: process the filtered files
+ // CRITICAL: For offset-based reads, process ALL candidate files in one call
+ // This prevents multiple ReadFromDiskFn calls with 1.127s overhead each
+ var filesProcessed int
+ var lastProcessedOffset int64
+ for _, entry := range candidateFiles {
+ var fileTsNs int64
+ if fileTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs, startOffset, isOffsetBased); err != nil {
+ return lastReadPosition, isDone, err
+ }
+ if fileTsNs > 0 {
+ processedTsNs = fileTsNs
+ filesProcessed++
+ }
+
+ // For offset-based reads, track the last processed offset
+ // We need to continue reading ALL files to avoid multiple disk read calls
+ if isOffsetBased {
+ // Extract the last offset from the file's extended attributes
+ if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 {
+ fileMaxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes))
+ if fileMaxOffset > lastProcessedOffset {
+ lastProcessedOffset = fileMaxOffset
+ }
+ }
+ }
+ }
+
+ if isOffsetBased && filesProcessed > 0 {
+ // Return a position that indicates we've read all disk data up to lastProcessedOffset
+ // This prevents the subscription from calling ReadFromDiskFn again for these offsets
+ lastReadPosition = log_buffer.NewMessagePositionFromOffset(lastProcessedOffset + 1)
+ } else {
+ // CRITICAL FIX: If no files were processed (e.g., all data already consumed),
+ // return the requested offset to prevent busy loop
+ if isOffsetBased {
+ // For offset-based reads with no data, return the requested offset
+ // This signals "I've checked, there's no data at this offset, move forward"
+ lastReadPosition = log_buffer.NewMessagePositionFromOffset(startOffset)
+ } else {
+ // For timestamp-based reads, return error (-2)
+ lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
+ }
+ }
return
}
}
diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go
index 3ea149699..01191eaad 100644
--- a/weed/mq/logstore/read_parquet_to_log.go
+++ b/weed/mq/logstore/read_parquet_to_log.go
@@ -10,10 +10,12 @@ import (
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto"
@@ -68,8 +70,14 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
return startPosition, true, nil
}
}
- recordType := topicConf.GetRecordType()
- if recordType == nil {
+ // Get schema - prefer flat schema if available
+ var recordType *schema_pb.RecordType
+ if topicConf.GetMessageRecordType() != nil {
+ // New flat schema format - use directly
+ recordType = topicConf.GetMessageRecordType()
+ }
+
+ if recordType == nil || len(recordType.Fields) == 0 {
// Return a no-op function if no schema is available
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (log_buffer.MessagePosition, bool, error) {
return startPosition, true, nil
@@ -78,6 +86,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
recordType = schema.NewRecordTypeBuilder(recordType).
WithField(SW_COLUMN_NAME_TS, schema.TypeInt64).
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ WithField(SW_COLUMN_NAME_OFFSET, schema.TypeInt64).
RecordTypeEnd()
parquetLevels, err := schema.ToParquetLevels(recordType)
@@ -121,10 +130,17 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
return processedTsNs, fmt.Errorf("marshal record value: %w", marshalErr)
}
+ // Get offset from parquet, default to 0 if not present (backward compatibility)
+ var offset int64 = 0
+ if offsetValue, exists := recordValue.Fields[SW_COLUMN_NAME_OFFSET]; exists {
+ offset = offsetValue.GetInt64Value()
+ }
+
logEntry := &filer_pb.LogEntry{
- Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(),
- TsNs: processedTsNs,
- Data: data,
+ Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(),
+ TsNs: processedTsNs,
+ Data: data,
+ Offset: offset,
}
// Skip control entries without actual data
@@ -153,7 +169,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
}
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
+ startFileName := startPosition.Time.UTC().Format(topic.TIME_FORMAT)
startTsNs := startPosition.Time.UnixNano()
var processedTsNs int64
@@ -171,14 +187,14 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
}
// read minTs from the parquet file
- minTsBytes := entry.Extended["min"]
+ minTsBytes := entry.Extended[mq.ExtendedAttrTimestampMin]
if len(minTsBytes) != 8 {
return nil
}
minTsNs := int64(binary.BigEndian.Uint64(minTsBytes))
// read max ts
- maxTsBytes := entry.Extended["max"]
+ maxTsBytes := entry.Extended[mq.ExtendedAttrTimestampMax]
if len(maxTsBytes) != 8 {
return nil
}