aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/logstore/log_to_parquet.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/logstore/log_to_parquet.go')
-rw-r--r--weed/mq/logstore/log_to_parquet.go107
1 files changed, 90 insertions, 17 deletions
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go
index d2762ff24..8855d68f9 100644
--- a/weed/mq/logstore/log_to_parquet.go
+++ b/weed/mq/logstore/log_to_parquet.go
@@ -3,7 +3,13 @@ package logstore
import (
"context"
"encoding/binary"
+ "encoding/json"
"fmt"
+ "io"
+ "os"
+ "strings"
+ "time"
+
"github.com/parquet-go/parquet-go"
"github.com/parquet-go/parquet-go/compress/zstd"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -16,10 +22,6 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/protobuf/proto"
- "io"
- "os"
- "strings"
- "time"
)
const (
@@ -217,25 +219,29 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
os.Remove(tempFile.Name())
}()
- writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
+ // Enable column statistics for fast aggregation queries
+ writer := parquet.NewWriter(tempFile, parquetSchema,
+ parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}),
+ parquet.DataPageStatistics(true), // Enable column statistics
+ )
rowBuilder := parquet.NewRowBuilder(parquetSchema)
var startTsNs, stopTsNs int64
for _, logFile := range logFileGroups {
- fmt.Printf("compact %s/%s ", partitionDir, logFile.Name)
var rows []parquet.Row
if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
+ // Skip control entries without actual data (same logic as read operations)
+ if isControlEntry(entry) {
+ return nil
+ }
+
if startTsNs == 0 {
startTsNs = entry.TsNs
}
stopTsNs = entry.TsNs
- if len(entry.Key) == 0 {
- return nil
- }
-
// write to parquet file
rowBuilder.Reset()
@@ -244,14 +250,25 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
return fmt.Errorf("unmarshal record value: %w", err)
}
+ // Initialize Fields map if nil (prevents nil map assignment panic)
+ if record.Fields == nil {
+ record.Fields = make(map[string]*schema_pb.Value)
+ }
+
record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{
Int64Value: entry.TsNs,
},
}
+
+ // Handle nil key bytes to prevent growslice panic in parquet-go
+ keyBytes := entry.Key
+ if keyBytes == nil {
+ keyBytes = []byte{} // Use empty slice instead of nil
+ }
record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{
- BytesValue: entry.Key,
+ BytesValue: keyBytes,
},
}
@@ -259,7 +276,17 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
return fmt.Errorf("add record value: %w", err)
}
- rows = append(rows, rowBuilder.Row())
+ // Build row and normalize any nil ByteArray values to empty slices
+ row := rowBuilder.Row()
+ for i, value := range row {
+ if value.Kind() == parquet.ByteArray {
+ if value.ByteArray() == nil {
+ row[i] = parquet.ByteArrayValue([]byte{})
+ }
+ }
+ }
+
+ rows = append(rows, row)
return nil
@@ -267,8 +294,9 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
return fmt.Errorf("iterate log entry %v/%v: %w", partitionDir, logFile.Name, err)
}
- fmt.Printf("processed %d rows\n", len(rows))
+ // Nil ByteArray handling is done during row creation
+ // Write all rows in a single call
if _, err := writer.WriteRows(rows); err != nil {
return fmt.Errorf("write rows: %w", err)
}
@@ -280,7 +308,22 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
// write to parquet file to partitionDir
parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
- if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
+
+ // Collect source log file names and buffer_start metadata for deduplication
+ var sourceLogFiles []string
+ var earliestBufferStart int64
+ for _, logFile := range logFileGroups {
+ sourceLogFiles = append(sourceLogFiles, logFile.Name)
+
+ // Extract buffer_start from log file metadata
+ if bufferStart := getBufferStartFromLogFile(logFile); bufferStart > 0 {
+ if earliestBufferStart == 0 || bufferStart < earliestBufferStart {
+ earliestBufferStart = bufferStart
+ }
+ }
+ }
+
+ if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil {
return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
}
@@ -288,7 +331,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin
}
-func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
+func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error {
uploader, err := operation.NewUploader()
if err != nil {
return fmt.Errorf("new uploader: %w", err)
@@ -321,6 +364,19 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
entry.Extended["max"] = maxTsBytes
+ // Store source log files for deduplication (JSON-encoded list)
+ if len(sourceLogFiles) > 0 {
+ sourceLogFilesJson, _ := json.Marshal(sourceLogFiles)
+ entry.Extended["sources"] = sourceLogFilesJson
+ }
+
+ // Store earliest buffer_start for precise broker deduplication
+ if earliestBufferStart > 0 {
+ bufferStartBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart))
+ entry.Extended["buffer_start"] = bufferStartBytes
+ }
+
for i := int64(0); i < chunkCount; i++ {
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
filerClient,
@@ -362,7 +418,6 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile
}); err != nil {
return fmt.Errorf("create entry: %w", err)
}
- fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
return nil
}
@@ -389,7 +444,6 @@ func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(ctx context.Context, fi
continue
}
if chunk.IsChunkManifest {
- fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
return
}
urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
@@ -453,3 +507,22 @@ func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (proc
return
}
+
+// getBufferStartFromLogFile extracts the buffer_start index from log file extended metadata
+func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 {
+ if logFile.Extended == nil {
+ return 0
+ }
+
+ // Parse buffer_start binary format
+ if startData, exists := logFile.Extended["buffer_start"]; exists {
+ if len(startData) == 8 {
+ startIndex := int64(binary.BigEndian.Uint64(startData))
+ if startIndex > 0 {
+ return startIndex
+ }
+ }
+ }
+
+ return 0
+}