diff options
Diffstat (limited to 'weed/mq/logstore/log_to_parquet.go')
| -rw-r--r-- | weed/mq/logstore/log_to_parquet.go | 114 |
1 files changed, 99 insertions, 15 deletions
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index 8855d68f9..bfd5ff10e 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -13,6 +13,7 @@ import ( "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/compress/zstd" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -25,8 +26,10 @@ import ( ) const ( - SW_COLUMN_NAME_TS = "_ts_ns" - SW_COLUMN_NAME_KEY = "_key" + SW_COLUMN_NAME_TS = "_ts_ns" + SW_COLUMN_NAME_KEY = "_key" + SW_COLUMN_NAME_OFFSET = "_offset" + SW_COLUMN_NAME_VALUE = "_value" ) func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error { @@ -185,7 +188,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) } // read min ts - minTsBytes := entry.Extended["min"] + minTsBytes := entry.Extended[mq.ExtendedAttrTimestampMin] if len(minTsBytes) != 8 { return nil } @@ -195,7 +198,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) } // read max ts - maxTsBytes := entry.Extended["max"] + maxTsBytes := entry.Extended[mq.ExtendedAttrTimestampMax] if len(maxTsBytes) != 8 { return nil } @@ -208,6 +211,36 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) return } +// isSchemalessRecordType checks if the recordType represents a schema-less topic +// Schema-less topics only have system fields: _ts_ns, _key, and _value +func isSchemalessRecordType(recordType *schema_pb.RecordType) bool { + if recordType == nil { + return false + } + + // Count only non-system data fields (exclude _ts_ns and _key which are always present) + // Schema-less topics should only have _value as the data field + hasValue := false + dataFieldCount := 0 + + for _, field := range recordType.Fields { + switch field.Name { + case SW_COLUMN_NAME_TS, SW_COLUMN_NAME_KEY, SW_COLUMN_NAME_OFFSET: + // System fields - ignore + continue + case SW_COLUMN_NAME_VALUE: + hasValue = true + dataFieldCount++ + default: + // Any other field means it's not schema-less + dataFieldCount++ + } + } + + // Schema-less = only has _value field as the data field (plus system fields) + return hasValue && dataFieldCount == 1 +} + func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) { tempFile, err := os.CreateTemp(".", "t*.parquet") @@ -227,6 +260,9 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin rowBuilder := parquet.NewRowBuilder(parquetSchema) var startTsNs, stopTsNs int64 + var minOffset, maxOffset int64 + var hasOffsets bool + isSchemaless := isSchemalessRecordType(recordType) for _, logFile := range logFileGroups { var rows []parquet.Row @@ -242,19 +278,56 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } stopTsNs = entry.TsNs + // Track offset ranges for Kafka integration + if entry.Offset > 0 { + if !hasOffsets { + minOffset = entry.Offset + maxOffset = entry.Offset + hasOffsets = true + } else { + if entry.Offset < minOffset { + minOffset = entry.Offset + } + if entry.Offset > maxOffset { + maxOffset = entry.Offset + } + } + } + // write to parquet file rowBuilder.Reset() record := &schema_pb.RecordValue{} - if err := proto.Unmarshal(entry.Data, record); err != nil { - return fmt.Errorf("unmarshal record value: %w", err) - } - // Initialize Fields map if nil (prevents nil map assignment panic) - if record.Fields == nil { + if isSchemaless { + // For schema-less topics, put raw entry.Data into _value field record.Fields = make(map[string]*schema_pb.Value) + record.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{ + BytesValue: entry.Data, + }, + } + } else { + // For schematized topics, unmarshal entry.Data as RecordValue + if err := proto.Unmarshal(entry.Data, record); err != nil { + return fmt.Errorf("unmarshal record value: %w", err) + } + + // Initialize Fields map if nil (prevents nil map assignment panic) + if record.Fields == nil { + record.Fields = make(map[string]*schema_pb.Value) + } + + // Add offset field to parquet records for native offset support + // ASSUMPTION: LogEntry.Offset field is populated by broker during message publishing + record.Fields[SW_COLUMN_NAME_OFFSET] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{ + Int64Value: entry.Offset, + }, + } } + // Add system columns (for both schematized and schema-less topics) record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{ Int64Value: entry.TsNs, @@ -323,7 +396,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } } - if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil { + if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart, minOffset, maxOffset, hasOffsets); err != nil { return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) } @@ -331,7 +404,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } -func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error { +func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64, minOffset, maxOffset int64, hasOffsets bool) error { uploader, err := operation.NewUploader() if err != nil { return fmt.Errorf("new uploader: %w", err) @@ -359,22 +432,33 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile entry.Extended = make(map[string][]byte) minTsBytes := make([]byte, 8) binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs)) - entry.Extended["min"] = minTsBytes + entry.Extended[mq.ExtendedAttrTimestampMin] = minTsBytes maxTsBytes := make([]byte, 8) binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) - entry.Extended["max"] = maxTsBytes + entry.Extended[mq.ExtendedAttrTimestampMax] = maxTsBytes + + // Add offset range metadata for Kafka integration (same as regular log files) + if hasOffsets && minOffset > 0 && maxOffset >= minOffset { + minOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset)) + entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes + + maxOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset)) + entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes + } // Store source log files for deduplication (JSON-encoded list) if len(sourceLogFiles) > 0 { sourceLogFilesJson, _ := json.Marshal(sourceLogFiles) - entry.Extended["sources"] = sourceLogFilesJson + entry.Extended[mq.ExtendedAttrSources] = sourceLogFilesJson } // Store earliest buffer_start for precise broker deduplication if earliestBufferStart > 0 { bufferStartBytes := make([]byte, 8) binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart)) - entry.Extended["buffer_start"] = bufferStartBytes + entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes } for i := int64(0); i < chunkCount; i++ { |
