aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/logstore/log_to_parquet.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/logstore/log_to_parquet.go')
-rw-r--r--weed/mq/logstore/log_to_parquet.go114
1 files changed, 99 insertions, 15 deletions
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go
index 8855d68f9..bfd5ff10e 100644
--- a/weed/mq/logstore/log_to_parquet.go
+++ b/weed/mq/logstore/log_to_parquet.go
@@ -13,6 +13,7 @@ import (
"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -25,8 +26,10 @@ import (
)
const (
- SW_COLUMN_NAME_TS = "_ts_ns"
- SW_COLUMN_NAME_KEY = "_key"
+ SW_COLUMN_NAME_TS = "_ts_ns"
+ SW_COLUMN_NAME_KEY = "_key"
+ SW_COLUMN_NAME_OFFSET = "_offset"
+ SW_COLUMN_NAME_VALUE = "_value"
)
func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
@@ -185,7 +188,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string)
}
// read min ts
- minTsBytes := entry.Extended["min"]
+ minTsBytes := entry.Extended[mq.ExtendedAttrTimestampMin]
if len(minTsBytes) != 8 {
return nil
}
@@ -195,7 +198,7 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string)
}
// read max ts
- maxTsBytes := entry.Extended["max"]
+ maxTsBytes := entry.Extended[mq.ExtendedAttrTimestampMax]
if len(maxTsBytes) != 8 {
return nil
}
@@ -208,6 +211,36 @@ func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string)
return
}
+// isSchemalessRecordType checks if the recordType represents a schema-less topic
+// Schema-less topics only have system fields: _ts_ns, _key, and _value
+func isSchemalessRecordType(recordType *schema_pb.RecordType) bool {
+ if recordType == nil {
+ return false
+ }
+
+ // Count only non-system data fields (exclude _ts_ns and _key which are always present)
+ // Schema-less topics should only have _value as the data field
+ hasValue := false
+ dataFieldCount := 0
+
+ for _, field := range recordType.Fields {
+ switch field.Name {
+ case SW_COLUMN_NAME_TS, SW_COLUMN_NAME_KEY, SW_COLUMN_NAME_OFFSET:
+ // System fields - ignore
+ continue
+ case SW_COLUMN_NAME_VALUE:
+ hasValue = true
+ dataFieldCount++
+ default:
+ // Any other field means it's not schema-less
+ dataFieldCount++
+ }
+ }
+
+ // Schema-less = only has _value field as the data field (plus system fields)
+ return hasValue && dataFieldCount == 1
+}
+
func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
tempFile, err := os.CreateTemp(".", "t*.parquet")
@@ -227,6 +260,9 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
rowBuilder := parquet.NewRowBuilder(parquetSchema)
var startTsNs, stopTsNs int64
+ var minOffset, maxOffset int64
+ var hasOffsets bool
+ isSchemaless := isSchemalessRecordType(recordType)
for _, logFile := range logFileGroups {
var rows []parquet.Row
@@ -242,19 +278,56 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
stopTsNs = entry.TsNs
+ // Track offset ranges for Kafka integration
+ if entry.Offset > 0 {
+ if !hasOffsets {
+ minOffset = entry.Offset
+ maxOffset = entry.Offset
+ hasOffsets = true
+ } else {
+ if entry.Offset < minOffset {
+ minOffset = entry.Offset
+ }
+ if entry.Offset > maxOffset {
+ maxOffset = entry.Offset
+ }
+ }
+ }
+
// write to parquet file
rowBuilder.Reset()
record := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(entry.Data, record); err != nil {
- return fmt.Errorf("unmarshal record value: %w", err)
- }
- // Initialize Fields map if nil (prevents nil map assignment panic)
- if record.Fields == nil {
+ if isSchemaless {
+ // For schema-less topics, put raw entry.Data into _value field
record.Fields = make(map[string]*schema_pb.Value)
+ record.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{
+ BytesValue: entry.Data,
+ },
+ }
+ } else {
+ // For schematized topics, unmarshal entry.Data as RecordValue
+ if err := proto.Unmarshal(entry.Data, record); err != nil {
+ return fmt.Errorf("unmarshal record value: %w", err)
+ }
+
+ // Initialize Fields map if nil (prevents nil map assignment panic)
+ if record.Fields == nil {
+ record.Fields = make(map[string]*schema_pb.Value)
+ }
+
+ // Add offset field to parquet records for native offset support
+ // ASSUMPTION: LogEntry.Offset field is populated by broker during message publishing
+ record.Fields[SW_COLUMN_NAME_OFFSET] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{
+ Int64Value: entry.Offset,
+ },
+ }
}
+ // Add system columns (for both schematized and schema-less topics)
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{
Int64Value: entry.TsNs,
@@ -323,7 +396,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
}
- if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil {
+ if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart, minOffset, maxOffset, hasOffsets); err != nil {
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
}
@@ -331,7 +404,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
-func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error {
+func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64, minOffset, maxOffset int64, hasOffsets bool) error {
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("new uploader: %w", err)
@@ -359,22 +432,33 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
entry.Extended = make(map[string][]byte)
minTsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
- entry.Extended["min"] = minTsBytes
+ entry.Extended[mq.ExtendedAttrTimestampMin] = minTsBytes
maxTsBytes := make([]byte, 8)
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
- entry.Extended["max"] = maxTsBytes
+ entry.Extended[mq.ExtendedAttrTimestampMax] = maxTsBytes
+
+ // Add offset range metadata for Kafka integration (same as regular log files)
+ if hasOffsets && minOffset > 0 && maxOffset >= minOffset {
+ minOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(minOffsetBytes, uint64(minOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMin] = minOffsetBytes
+
+ maxOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxOffsetBytes, uint64(maxOffset))
+ entry.Extended[mq.ExtendedAttrOffsetMax] = maxOffsetBytes
+ }
// Store source log files for deduplication (JSON-encoded list)
if len(sourceLogFiles) > 0 {
sourceLogFilesJson, _ := json.Marshal(sourceLogFiles)
- entry.Extended["sources"] = sourceLogFilesJson
+ entry.Extended[mq.ExtendedAttrSources] = sourceLogFilesJson
}
// Store earliest buffer_start for precise broker deduplication
if earliestBufferStart > 0 {
bufferStartBytes := make([]byte, 8)
binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart))
- entry.Extended["buffer_start"] = bufferStartBytes
+ entry.Extended[mq.ExtendedAttrBufferStart] = bufferStartBytes
}
for i := int64(0); i < chunkCount; i++ {