diff options
Diffstat (limited to 'weed/mq/logstore')
| -rw-r--r-- | weed/mq/logstore/log_to_parquet.go | 114 | ||||
| -rw-r--r-- | weed/mq/logstore/merged_read.go | 41 | ||||
| -rw-r--r-- | weed/mq/logstore/read_log_from_disk.go | 235 | ||||
| -rw-r--r-- | weed/mq/logstore/read_parquet_to_log.go | 32 |
4 files changed, 362 insertions, 60 deletions
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index 8855d68f9..bfd5ff10e 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -13,6 +13,7 @@ import ( "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/compress/zstd" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -25,8 +26,10 @@ import ( ) const ( - SW_COLUMN_NAME_TS = "_ts_ns" - SW_COLUMN_NAME_KEY = "_key" + SW_COLUMN_NAME_TS = "_ts_ns" + SW_COLUMN_NAME_KEY = "_key" + SW_COLUMN_NAME_OFFSET = "_offset" + SW_COLUMN_NAME_VALUE = "_value" ) func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error { @@ -185,7 +188,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) } // read min ts - minTsBytes := entry.Extended["min"] + minTsBytes := entry.Extended[mq.ExtendedAttrTimestampMin] if len(minTsBytes) != 8 { return nil } @@ -195,7 +198,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) } // read max ts - maxTsBytes := entry.Extended["max"] + maxTsBytes := entry.Extended[mq.ExtendedAttrTimestampMax] if len(maxTsBytes) != 8 { return nil } @@ -208,6 +211,36 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) return } +// isSchemalessRecordType checks if the recordType represents a schema-less topic +// Schema-less topics only have system fields: _ts_ns, _key, and _value +func isSchemalessRecordType(recordType *schema_pb.RecordType) bool { + if recordType == nil { + return false + } + + // Count only non-system data fields (exclude _ts_ns and _key which are always present) + // Schema-less topics should only have _value as the data field + hasValue := false + dataFieldCount := 0 + + for _, field := range recordType.Fields { + switch field.Name { + case SW_COLUMN_NAME_TS, SW_COLUMN_NAME_KEY, SW_COLUMN_NAME_OFFSET: + // System fields - ignore + continue + case SW_COLUMN_NAME_VALUE: + hasValue = true + dataFieldCount++ + default: + // Any other field means it's not schema-less + dataFieldCount++ + } + } + + // Schema-less = only has _value field as the data field (plus system fields) + return hasValue && dataFieldCount == 1 +} + func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) { tempFile, err := os.CreateTemp(".", "t*.parquet") @@ -227,6 +260,9 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin rowBuilder := parquet.NewRowBuilder(parquetSchema) var startTsNs, stopTsNs int64 + var minOffset, maxOffset int64 + var hasOffsets bool + isSchemaless := isSchemalessRecordType(recordType) for _, logFile := range logFileGroups { var rows []parquet.Row @@ -242,19 +278,56 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } stopTsNs = entry.TsNs + // Track offset ranges for Kafka integration + if entry.Offset > 0 { + if !hasOffsets { + minOffset = entry.Offset + maxOffset = entry.Offset + hasOffsets = true + } else { + if entry.Offset < minOffset { + minOffset = entry.Offset + } + if entry.Offset > maxOffset { + maxOffset = entry.Offset + } + } + } + // write to parquet file rowBuilder.Reset() record := &schema_pb.RecordValue{} - if err := proto.Unmarshal(entry.Data, record); err != nil { - return fmt.Errorf("unmarshal record value: %w", err) - } - // Initialize Fields map if nil (prevents nil map assignment panic) - if record.Fields == nil { + if isSchemaless { + // For schema-less topics, put raw entry.Data into _value field record.Fields = make(map[string]*schema_pb.Value) + record.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{ + BytesValue: entry.Data, + }, + } + } else { + // For schematized topics, unmarshal entry.Data as RecordValue + if err := proto.Unmarshal(entry.Data, record); err != nil { + return fmt.Errorf("unmarshal record value: %w", err) + } + + // Initialize Fields map if nil (prevents nil map assignment panic) + if record.Fields == nil { + record.Fields = make(map[string]*schema_pb.Value) + } + + // Add offset field to parquet records for native offset support + // ASSUMPTION: LogEntry.Offset field is populated by broker during message publishing + record.Fields[SW_COLUMN_NAME_OFFSET] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{ + Int64Value: entry.Offset, + }, + } } + // Add system columns (for both schematized and schema-less topics) record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{ Int64Value: entry.TsNs, @@ -323,7 +396,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } } - if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil { + if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart, minOffset, maxOffset, hasOffsets); err != nil { return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) } @@ -331,7 +404,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } -func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error { +func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64, minOffset, maxOffset int64, hasOffsets bool) error { uploader, err := operation.NewUploader() if err != nil { return fmt.Errorf("new uploader: %w", err) @@ -359,22 +432,33 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile entry.Extended = make(map[string][]byte) minTsBytes := make([]byte, 8) binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs)) - entry.Extended["min"] = minTsBytes + entry.Extended[mq.ExtendedAttrTimestampMin] = minTsBytes maxTsBytes := make([]byte, 8) binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) - entry.Extended["max"] = maxTsBytes + entry.Extended[mq.ExtendedAttrTimestampMax] = maxTsBytes + + // Add offset range metadata for Kafka integration (same as regular log files) + if hasOffsets && minOffset > 0 && maxOffset >= minOffset { + minOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset)) + entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes + + maxOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset)) + entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes + } // Store source log files for deduplication (JSON-encoded list) if len(sourceLogFiles) > 0 { sourceLogFilesJson, _ := json.Marshal(sourceLogFiles) - entry.Extended["sources"] = sourceLogFilesJson + entry.Extended[mq.ExtendedAttrSources] = sourceLogFilesJson } // Store earliest buffer_start for precise broker deduplication if earliestBufferStart > 0 { bufferStartBytes := make([]byte, 8) binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart)) - entry.Extended["buffer_start"] = bufferStartBytes + entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes } for i := int64(0); i < chunkCount; i++ { diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go index 38164a80f..c2e8e3caf 100644 --- a/weed/mq/logstore/merged_read.go +++ b/weed/mq/logstore/merged_read.go @@ -15,29 +15,36 @@ func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic. } func mergeReadFuncs(readLogDirectFn, fromParquetFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType { - var exhaustedLiveLogs bool - var lastProcessedPosition log_buffer.MessagePosition + // CRITICAL FIX: Removed stateful closure variables (exhaustedLiveLogs, lastProcessedPosition) + // These caused the function to skip disk reads on subsequent calls, leading to + // Schema Registry timeout when data was flushed after the first read attempt. + // The function must be stateless and check for data on EVERY call. return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { - if !exhaustedLiveLogs { - // glog.V(4).Infof("reading from live logs startPosition: %v\n", startPosition.UTC()) - lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) - // glog.V(4).Infof("read from live logs: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) - if isDone { - isDone = false - } - if err != nil { - return - } - lastProcessedPosition = lastReadPosition + // Always try reading from live logs first (recent data) + lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) + if isDone { + // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST), + // we want to continue to read from in-memory data + isDone = false + } + if err != nil { + return } - exhaustedLiveLogs = true - if startPosition.Before(lastProcessedPosition.Time) { - startPosition = lastProcessedPosition + // If live logs returned data, update startPosition for parquet read + if lastReadPosition.Offset > startPosition.Offset || lastReadPosition.Time.After(startPosition.Time) { + startPosition = lastReadPosition } - // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) + // Then try reading from Parquet files (historical data) lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) + + if isDone { + // For very early timestamps (like timestamp=1 for RESET_TO_EARLIEST), + // parquet files won't exist, but we want to continue to in-memory data reading + isDone = false + } + return } } diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go index 61c231461..86c8b40cc 100644 --- a/weed/mq/logstore/read_log_from_disk.go +++ b/weed/mq/logstore/read_log_from_disk.go @@ -2,6 +2,7 @@ package logstore import ( "context" + "encoding/binary" "fmt" "math" "strings" @@ -20,9 +21,15 @@ import ( func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType { partitionDir := topic.PartitionDir(t, p) + // Create a small cache for recently-read file chunks (3 files, 60s TTL) + // This significantly reduces Filer load when multiple consumers are catching up + fileCache := log_buffer.NewDiskBufferCache(3, 60*time.Second) + lookupFileIdFn := filer.LookupFn(filerClient) - eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { + eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64, startOffset int64, isOffsetBased bool) (processedTsNs int64, err error) { + entriesSkipped := 0 + entriesProcessed := 0 for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) @@ -38,13 +45,24 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %w", err) return } - if logEntry.TsNs <= starTsNs { - pos += 4 + int(size) - continue - } - if stopTsNs != 0 && logEntry.TsNs > stopTsNs { - println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) - return + + // Filter by offset if this is an offset-based subscription + if isOffsetBased { + if logEntry.Offset < startOffset { + entriesSkipped++ + pos += 4 + int(size) + continue + } + } else { + // Filter by timestamp for timestamp-based subscriptions + if logEntry.TsNs <= starTsNs { + pos += 4 + int(size) + continue + } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) + return + } } // fmt.Printf(" read logEntry: %v, ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC()) @@ -54,6 +72,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top } processedTsNs = logEntry.TsNs + entriesProcessed++ pos += 4 + int(size) @@ -62,7 +81,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top return } - eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) { + eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64, startOffset int64, isOffsetBased bool) (processedTsNs int64, err error) { if len(entry.Content) > 0 { // skip .offset files return @@ -78,28 +97,58 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top } urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId) if err != nil { + glog.V(1).Infof("lookup %s failed: %v", chunk.FileId, err) err = fmt.Errorf("lookup %s: %v", chunk.FileId, err) return } if len(urlStrings) == 0 { + glog.V(1).Infof("no url found for %s", chunk.FileId) err = fmt.Errorf("no url found for %s", chunk.FileId) return } + glog.V(2).Infof("lookup %s returned %d URLs", chunk.FileId, len(urlStrings)) - // try one of the urlString until util.Get(urlString) succeeds + // Try to get data from cache first + cacheKey := fmt.Sprintf("%s/%s/%d/%s", t.Name, p.String(), p.RangeStart, chunk.FileId) + if cachedData, _, found := fileCache.Get(cacheKey); found { + if cachedData == nil { + // Negative cache hit - data doesn't exist + continue + } + // Positive cache hit - data exists + if processedTsNs, err = eachChunkFn(cachedData, eachLogEntryFn, starTsNs, stopTsNs, startOffset, isOffsetBased); err != nil { + glog.V(1).Infof("eachChunkFn failed on cached data: %v", err) + return + } + continue + } + + // Cache miss - try one of the urlString until util.Get(urlString) succeeds var processed bool for _, urlString := range urlStrings { // TODO optimization opportunity: reuse the buffer var data []byte + glog.V(2).Infof("trying to fetch data from %s", urlString) if data, _, err = util_http.Get(urlString); err == nil { + glog.V(2).Infof("successfully fetched %d bytes from %s", len(data), urlString) processed = true - if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil { + + // Store in cache for future reads + fileCache.Put(cacheKey, data, startOffset) + + if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs, startOffset, isOffsetBased); err != nil { + glog.V(1).Infof("eachChunkFn failed: %v", err) return } break + } else { + glog.V(2).Infof("failed to fetch from %s: %v", urlString, err) } } if !processed { + // Store negative cache entry - data doesn't exist or all URLs failed + fileCache.Put(cacheKey, nil, startOffset) + glog.V(1).Infof("no data processed for %s %s - all URLs failed", entry.Name, chunk.FileId) err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId) return } @@ -109,37 +158,183 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top } return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { - startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) + startFileName := startPosition.Time.UTC().Format(topic.TIME_FORMAT) startTsNs := startPosition.Time.UnixNano() stopTime := time.Unix(0, stopTsNs) var processedTsNs int64 + + // Check if this is an offset-based subscription + isOffsetBased := startPosition.IsOffsetBased + var startOffset int64 + if isOffsetBased { + startOffset = startPosition.Offset + // CRITICAL FIX: For offset-based reads, ignore startFileName (which is based on Time) + // and list all files from the beginning to find the right offset + startFileName = "" + glog.V(1).Infof("disk read start: topic=%s partition=%s startOffset=%d", + t.Name, p, startOffset) + } + + // OPTIMIZATION: For offset-based reads, collect all files with their offset ranges first + // Then use binary search to find the right file, and skip files that don't contain the offset + var candidateFiles []*filer_pb.Entry + var foundStartFile bool + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // First pass: collect all relevant files with their metadata + glog.V(2).Infof("listing directory %s for offset %d startFileName=%q", partitionDir, startOffset, startFileName) return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { return nil } if strings.HasSuffix(entry.Name, ".parquet") { return nil } - // FIXME: this is a hack to skip the .offset files if strings.HasSuffix(entry.Name, ".offset") { return nil } if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) { - isDone = true - return nil - } - if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) { return nil } - if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil { - return err + + // OPTIMIZATION: For offset-based reads, check if this file contains the requested offset + if isOffsetBased { + glog.V(3).Infof("found file %s", entry.Name) + // Check if file has offset range metadata + if minOffsetBytes, hasMin := entry.Extended["offset_min"]; hasMin && len(minOffsetBytes) == 8 { + if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 { + fileMinOffset := int64(binary.BigEndian.Uint64(minOffsetBytes)) + fileMaxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes)) + + // Skip files that don't contain our offset range + if startOffset > fileMaxOffset { + return nil + } + + // If we haven't found the start file yet, check if this file contains it + if !foundStartFile && startOffset >= fileMinOffset && startOffset <= fileMaxOffset { + foundStartFile = true + } + } + } + // If file doesn't have offset metadata, include it (might be old format) + } else { + // Timestamp-based filtering + topicName := t.Name + if dotIndex := strings.LastIndex(topicName, "."); dotIndex != -1 { + topicName = topicName[dotIndex+1:] + } + isSystemTopic := strings.HasPrefix(topicName, "_") + if !isSystemTopic && startPosition.Time.Unix() > 86400 && entry.Name < startPosition.Time.UTC().Format(topic.TIME_FORMAT) { + return nil + } } + + // Add file to candidates for processing + candidateFiles = append(candidateFiles, entry) + glog.V(3).Infof("added candidate file %s (total=%d)", entry.Name, len(candidateFiles)) return nil }, startFileName, true, math.MaxInt32) }) - lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) + + if err != nil { + glog.Errorf("failed to list directory %s: %v", partitionDir, err) + return + } + + glog.V(2).Infof("found %d candidate files for topic=%s partition=%s offset=%d", + len(candidateFiles), t.Name, p, startOffset) + + if len(candidateFiles) == 0 { + glog.V(2).Infof("no files found in %s", partitionDir) + return startPosition, isDone, nil + } + + // OPTIMIZATION: For offset-based reads with many files, use binary search to find start file + if isOffsetBased && len(candidateFiles) > 10 { + // Binary search to find the first file that might contain our offset + left, right := 0, len(candidateFiles)-1 + startIdx := 0 + + for left <= right { + mid := (left + right) / 2 + entry := candidateFiles[mid] + + if minOffsetBytes, hasMin := entry.Extended["offset_min"]; hasMin && len(minOffsetBytes) == 8 { + if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 { + fileMinOffset := int64(binary.BigEndian.Uint64(minOffsetBytes)) + fileMaxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes)) + + if startOffset < fileMinOffset { + // Our offset is before this file, search left + right = mid - 1 + } else if startOffset > fileMaxOffset { + // Our offset is after this file, search right + left = mid + 1 + startIdx = left + } else { + // Found the file containing our offset + startIdx = mid + break + } + } else { + break + } + } else { + break + } + } + + // Process files starting from the found index + candidateFiles = candidateFiles[startIdx:] + } + + // Second pass: process the filtered files + // CRITICAL: For offset-based reads, process ALL candidate files in one call + // This prevents multiple ReadFromDiskFn calls with 1.127s overhead each + var filesProcessed int + var lastProcessedOffset int64 + for _, entry := range candidateFiles { + var fileTsNs int64 + if fileTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs, startOffset, isOffsetBased); err != nil { + return lastReadPosition, isDone, err + } + if fileTsNs > 0 { + processedTsNs = fileTsNs + filesProcessed++ + } + + // For offset-based reads, track the last processed offset + // We need to continue reading ALL files to avoid multiple disk read calls + if isOffsetBased { + // Extract the last offset from the file's extended attributes + if maxOffsetBytes, hasMax := entry.Extended["offset_max"]; hasMax && len(maxOffsetBytes) == 8 { + fileMaxOffset := int64(binary.BigEndian.Uint64(maxOffsetBytes)) + if fileMaxOffset > lastProcessedOffset { + lastProcessedOffset = fileMaxOffset + } + } + } + } + + if isOffsetBased && filesProcessed > 0 { + // Return a position that indicates we've read all disk data up to lastProcessedOffset + // This prevents the subscription from calling ReadFromDiskFn again for these offsets + lastReadPosition = log_buffer.NewMessagePositionFromOffset(lastProcessedOffset + 1) + } else { + // CRITICAL FIX: If no files were processed (e.g., all data already consumed), + // return the requested offset to prevent busy loop + if isOffsetBased { + // For offset-based reads with no data, return the requested offset + // This signals "I've checked, there's no data at this offset, move forward" + lastReadPosition = log_buffer.NewMessagePositionFromOffset(startOffset) + } else { + // For timestamp-based reads, return error (-2) + lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2) + } + } return } } diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 3ea149699..01191eaad 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -10,10 +10,12 @@ import ( "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" @@ -68,8 +70,14 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic return startPosition, true, nil } } - recordType := topicConf.GetRecordType() - if recordType == nil { + // Get schema - prefer flat schema if available + var recordType *schema_pb.RecordType + if topicConf.GetMessageRecordType() != nil { + // New flat schema format - use directly + recordType = topicConf.GetMessageRecordType() + } + + if recordType == nil || len(recordType.Fields) == 0 { // Return a no-op function if no schema is available return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (log_buffer.MessagePosition, bool, error) { return startPosition, true, nil @@ -78,6 +86,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic recordType = schema.NewRecordTypeBuilder(recordType). WithField(SW_COLUMN_NAME_TS, schema.TypeInt64). WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + WithField(SW_COLUMN_NAME_OFFSET, schema.TypeInt64). RecordTypeEnd() parquetLevels, err := schema.ToParquetLevels(recordType) @@ -121,10 +130,17 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic return processedTsNs, fmt.Errorf("marshal record value: %w", marshalErr) } + // Get offset from parquet, default to 0 if not present (backward compatibility) + var offset int64 = 0 + if offsetValue, exists := recordValue.Fields[SW_COLUMN_NAME_OFFSET]; exists { + offset = offsetValue.GetInt64Value() + } + logEntry := &filer_pb.LogEntry{ - Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(), - TsNs: processedTsNs, - Data: data, + Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(), + TsNs: processedTsNs, + Data: data, + Offset: offset, } // Skip control entries without actual data @@ -153,7 +169,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic } return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { - startFileName := startPosition.UTC().Format(topic.TIME_FORMAT) + startFileName := startPosition.Time.UTC().Format(topic.TIME_FORMAT) startTsNs := startPosition.Time.UnixNano() var processedTsNs int64 @@ -171,14 +187,14 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic } // read minTs from the parquet file - minTsBytes := entry.Extended["min"] + minTsBytes := entry.Extended[mq.ExtendedAttrTimestampMin] if len(minTsBytes) != 8 { return nil } minTsNs := int64(binary.BigEndian.Uint64(minTsBytes)) // read max ts - maxTsBytes := entry.Extended["max"] + maxTsBytes := entry.Extended[mq.ExtendedAttrTimestampMax] if len(maxTsBytes) != 8 { return nil } |
