diff options
Diffstat (limited to 'weed/mq/logstore/log_to_parquet.go')
| -rw-r--r-- | weed/mq/logstore/log_to_parquet.go | 107 |
1 files changed, 90 insertions, 17 deletions
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index d2762ff24..8855d68f9 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -3,7 +3,13 @@ package logstore import ( "context" "encoding/binary" + "encoding/json" "fmt" + "io" + "os" + "strings" + "time" + "github.com/parquet-go/parquet-go" "github.com/parquet-go/parquet-go/compress/zstd" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -16,10 +22,6 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" - "io" - "os" - "strings" - "time" ) const ( @@ -217,25 +219,29 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin os.Remove(tempFile.Name()) }() - writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel})) + // Enable column statistics for fast aggregation queries + writer := parquet.NewWriter(tempFile, parquetSchema, + parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}), + parquet.DataPageStatistics(true), // Enable column statistics + ) rowBuilder := parquet.NewRowBuilder(parquetSchema) var startTsNs, stopTsNs int64 for _, logFile := range logFileGroups { - fmt.Printf("compact %s/%s ", partitionDir, logFile.Name) var rows []parquet.Row if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error { + // Skip control entries without actual data (same logic as read operations) + if isControlEntry(entry) { + return nil + } + if startTsNs == 0 { startTsNs = entry.TsNs } stopTsNs = entry.TsNs - if len(entry.Key) == 0 { - return nil - } - // write to parquet file rowBuilder.Reset() @@ -244,14 +250,25 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin return fmt.Errorf("unmarshal record value: %w", err) } + // Initialize Fields map if nil (prevents nil map assignment panic) + if record.Fields == nil { + record.Fields = make(map[string]*schema_pb.Value) + } + record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{ Kind: &schema_pb.Value_Int64Value{ Int64Value: entry.TsNs, }, } + + // Handle nil key bytes to prevent growslice panic in parquet-go + keyBytes := entry.Key + if keyBytes == nil { + keyBytes = []byte{} // Use empty slice instead of nil + } record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ Kind: &schema_pb.Value_BytesValue{ - BytesValue: entry.Key, + BytesValue: keyBytes, }, } @@ -259,7 +276,17 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin return fmt.Errorf("add record value: %w", err) } - rows = append(rows, rowBuilder.Row()) + // Build row and normalize any nil ByteArray values to empty slices + row := rowBuilder.Row() + for i, value := range row { + if value.Kind() == parquet.ByteArray { + if value.ByteArray() == nil { + row[i] = parquet.ByteArrayValue([]byte{}) + } + } + } + + rows = append(rows, row) return nil @@ -267,8 +294,9 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin return fmt.Errorf("iterate log entry %v/%v: %w", partitionDir, logFile.Name, err) } - fmt.Printf("processed %d rows\n", len(rows)) + // Nil ByteArray handling is done during row creation + // Write all rows in a single call if _, err := writer.WriteRows(rows); err != nil { return fmt.Errorf("write rows: %w", err) } @@ -280,7 +308,22 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin // write to parquet file to partitionDir parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05")) - if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil { + + // Collect source log file names and buffer_start metadata for deduplication + var sourceLogFiles []string + var earliestBufferStart int64 + for _, logFile := range logFileGroups { + sourceLogFiles = append(sourceLogFiles, logFile.Name) + + // Extract buffer_start from log file metadata + if bufferStart := getBufferStartFromLogFile(logFile); bufferStart > 0 { + if earliestBufferStart == 0 || bufferStart < earliestBufferStart { + earliestBufferStart = bufferStart + } + } + } + + if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil { return fmt.Errorf("save parquet file %s: %v", parquetFileName, err) } @@ -288,7 +331,7 @@ func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir strin } -func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error { +func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error { uploader, err := operation.NewUploader() if err != nil { return fmt.Errorf("new uploader: %w", err) @@ -321,6 +364,19 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs)) entry.Extended["max"] = maxTsBytes + // Store source log files for deduplication (JSON-encoded list) + if len(sourceLogFiles) > 0 { + sourceLogFilesJson, _ := json.Marshal(sourceLogFiles) + entry.Extended["sources"] = sourceLogFilesJson + } + + // Store earliest buffer_start for precise broker deduplication + if earliestBufferStart > 0 { + bufferStartBytes := make([]byte, 8) + binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart)) + entry.Extended["buffer_start"] = bufferStartBytes + } + for i := int64(0); i < chunkCount; i++ { fileId, uploadResult, err, _ := uploader.UploadWithRetry( filerClient, @@ -362,7 +418,6 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile }); err != nil { return fmt.Errorf("create entry: %w", err) } - fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName) return nil } @@ -389,7 +444,6 @@ func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(ctx context.Context, fi continue } if chunk.IsChunkManifest { - fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name) return } urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId) @@ -453,3 +507,22 @@ func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (proc return } + +// getBufferStartFromLogFile extracts the buffer_start index from log file extended metadata +func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 { + if logFile.Extended == nil { + return 0 + } + + // Parse buffer_start binary format + if startData, exists := logFile.Extended["buffer_start"]; exists { + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return startIndex + } + } + } + + return 0 +} |
