diff options
Diffstat (limited to 'weed/mq/kafka/compression')
| -rw-r--r-- | weed/mq/kafka/compression/compression.go | 203 | ||||
| -rw-r--r-- | weed/mq/kafka/compression/compression_test.go | 353 |
2 files changed, 556 insertions, 0 deletions
diff --git a/weed/mq/kafka/compression/compression.go b/weed/mq/kafka/compression/compression.go new file mode 100644 index 000000000..f4c472199 --- /dev/null +++ b/weed/mq/kafka/compression/compression.go @@ -0,0 +1,203 @@ +package compression + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" +) + +// nopCloser wraps an io.Reader to provide a no-op Close method +type nopCloser struct { + io.Reader +} + +func (nopCloser) Close() error { return nil } + +// CompressionCodec represents the compression codec used in Kafka record batches +type CompressionCodec int8 + +const ( + None CompressionCodec = 0 + Gzip CompressionCodec = 1 + Snappy CompressionCodec = 2 + Lz4 CompressionCodec = 3 + Zstd CompressionCodec = 4 +) + +// String returns the string representation of the compression codec +func (c CompressionCodec) String() string { + switch c { + case None: + return "none" + case Gzip: + return "gzip" + case Snappy: + return "snappy" + case Lz4: + return "lz4" + case Zstd: + return "zstd" + default: + return fmt.Sprintf("unknown(%d)", c) + } +} + +// IsValid returns true if the compression codec is valid +func (c CompressionCodec) IsValid() bool { + return c >= None && c <= Zstd +} + +// ExtractCompressionCodec extracts the compression codec from record batch attributes +func ExtractCompressionCodec(attributes int16) CompressionCodec { + return CompressionCodec(attributes & 0x07) // Lower 3 bits +} + +// SetCompressionCodec sets the compression codec in record batch attributes +func SetCompressionCodec(attributes int16, codec CompressionCodec) int16 { + return (attributes &^ 0x07) | int16(codec) +} + +// Compress compresses data using the specified codec +func Compress(codec CompressionCodec, data []byte) ([]byte, error) { + if codec == None { + return data, nil + } + + var buf bytes.Buffer + var writer io.WriteCloser + var err error + + switch codec { + case Gzip: + writer = gzip.NewWriter(&buf) + case Snappy: + // Snappy doesn't have a streaming writer, so we compress directly + compressed := snappy.Encode(nil, data) + if compressed == nil { + compressed = []byte{} + } + return compressed, nil + case Lz4: + writer = lz4.NewWriter(&buf) + case Zstd: + writer, err = zstd.NewWriter(&buf) + if err != nil { + return nil, fmt.Errorf("failed to create zstd writer: %w", err) + } + default: + return nil, fmt.Errorf("unsupported compression codec: %s", codec) + } + + if _, err := writer.Write(data); err != nil { + writer.Close() + return nil, fmt.Errorf("failed to write compressed data: %w", err) + } + + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to close compressor: %w", err) + } + + return buf.Bytes(), nil +} + +// Decompress decompresses data using the specified codec +func Decompress(codec CompressionCodec, data []byte) ([]byte, error) { + if codec == None { + return data, nil + } + + var reader io.ReadCloser + var err error + + buf := bytes.NewReader(data) + + switch codec { + case Gzip: + reader, err = gzip.NewReader(buf) + if err != nil { + return nil, fmt.Errorf("failed to create gzip reader: %w", err) + } + case Snappy: + // Snappy doesn't have a streaming reader, so we decompress directly + decompressed, err := snappy.Decode(nil, data) + if err != nil { + return nil, fmt.Errorf("failed to decompress snappy data: %w", err) + } + if decompressed == nil { + decompressed = []byte{} + } + return decompressed, nil + case Lz4: + lz4Reader := lz4.NewReader(buf) + // lz4.Reader doesn't implement Close, so we wrap it + reader = &nopCloser{Reader: lz4Reader} + case Zstd: + zstdReader, err := zstd.NewReader(buf) + if err != nil { + return nil, fmt.Errorf("failed to create zstd reader: %w", err) + } + defer zstdReader.Close() + + var result bytes.Buffer + if _, err := io.Copy(&result, zstdReader); err != nil { + return nil, fmt.Errorf("failed to decompress zstd data: %w", err) + } + decompressed := result.Bytes() + if decompressed == nil { + decompressed = []byte{} + } + return decompressed, nil + default: + return nil, fmt.Errorf("unsupported compression codec: %s", codec) + } + + defer reader.Close() + + var result bytes.Buffer + if _, err := io.Copy(&result, reader); err != nil { + return nil, fmt.Errorf("failed to decompress data: %w", err) + } + + decompressed := result.Bytes() + if decompressed == nil { + decompressed = []byte{} + } + return decompressed, nil +} + +// CompressRecordBatch compresses the records portion of a Kafka record batch +// This function compresses only the records data, not the entire batch header +func CompressRecordBatch(codec CompressionCodec, recordsData []byte) ([]byte, int16, error) { + if codec == None { + return recordsData, 0, nil + } + + compressed, err := Compress(codec, recordsData) + if err != nil { + return nil, 0, fmt.Errorf("failed to compress record batch: %w", err) + } + + attributes := int16(codec) + return compressed, attributes, nil +} + +// DecompressRecordBatch decompresses the records portion of a Kafka record batch +func DecompressRecordBatch(attributes int16, compressedData []byte) ([]byte, error) { + codec := ExtractCompressionCodec(attributes) + + if codec == None { + return compressedData, nil + } + + decompressed, err := Decompress(codec, compressedData) + if err != nil { + return nil, fmt.Errorf("failed to decompress record batch: %w", err) + } + + return decompressed, nil +} diff --git a/weed/mq/kafka/compression/compression_test.go b/weed/mq/kafka/compression/compression_test.go new file mode 100644 index 000000000..41fe82651 --- /dev/null +++ b/weed/mq/kafka/compression/compression_test.go @@ -0,0 +1,353 @@ +package compression + +import ( + "bytes" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestCompressionCodec_String tests the string representation of compression codecs +func TestCompressionCodec_String(t *testing.T) { + tests := []struct { + codec CompressionCodec + expected string + }{ + {None, "none"}, + {Gzip, "gzip"}, + {Snappy, "snappy"}, + {Lz4, "lz4"}, + {Zstd, "zstd"}, + {CompressionCodec(99), "unknown(99)"}, + } + + for _, test := range tests { + t.Run(test.expected, func(t *testing.T) { + assert.Equal(t, test.expected, test.codec.String()) + }) + } +} + +// TestCompressionCodec_IsValid tests codec validation +func TestCompressionCodec_IsValid(t *testing.T) { + tests := []struct { + codec CompressionCodec + valid bool + }{ + {None, true}, + {Gzip, true}, + {Snappy, true}, + {Lz4, true}, + {Zstd, true}, + {CompressionCodec(-1), false}, + {CompressionCodec(5), false}, + {CompressionCodec(99), false}, + } + + for _, test := range tests { + t.Run(test.codec.String(), func(t *testing.T) { + assert.Equal(t, test.valid, test.codec.IsValid()) + }) + } +} + +// TestExtractCompressionCodec tests extracting compression codec from attributes +func TestExtractCompressionCodec(t *testing.T) { + tests := []struct { + name string + attributes int16 + expected CompressionCodec + }{ + {"None", 0x0000, None}, + {"Gzip", 0x0001, Gzip}, + {"Snappy", 0x0002, Snappy}, + {"Lz4", 0x0003, Lz4}, + {"Zstd", 0x0004, Zstd}, + {"Gzip with transactional", 0x0011, Gzip}, // Bit 4 set (transactional) + {"Snappy with control", 0x0022, Snappy}, // Bit 5 set (control) + {"Lz4 with both flags", 0x0033, Lz4}, // Both flags set + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + codec := ExtractCompressionCodec(test.attributes) + assert.Equal(t, test.expected, codec) + }) + } +} + +// TestSetCompressionCodec tests setting compression codec in attributes +func TestSetCompressionCodec(t *testing.T) { + tests := []struct { + name string + attributes int16 + codec CompressionCodec + expected int16 + }{ + {"Set None", 0x0000, None, 0x0000}, + {"Set Gzip", 0x0000, Gzip, 0x0001}, + {"Set Snappy", 0x0000, Snappy, 0x0002}, + {"Set Lz4", 0x0000, Lz4, 0x0003}, + {"Set Zstd", 0x0000, Zstd, 0x0004}, + {"Replace Gzip with Snappy", 0x0001, Snappy, 0x0002}, + {"Set Gzip preserving transactional", 0x0010, Gzip, 0x0011}, + {"Set Lz4 preserving control", 0x0020, Lz4, 0x0023}, + {"Set Zstd preserving both flags", 0x0030, Zstd, 0x0034}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := SetCompressionCodec(test.attributes, test.codec) + assert.Equal(t, test.expected, result) + }) + } +} + +// TestCompress_None tests compression with None codec +func TestCompress_None(t *testing.T) { + data := []byte("Hello, World!") + + compressed, err := Compress(None, data) + require.NoError(t, err) + assert.Equal(t, data, compressed, "None codec should return original data") +} + +// TestCompress_Gzip tests gzip compression +func TestCompress_Gzip(t *testing.T) { + data := []byte("Hello, World! This is a test message for gzip compression.") + + compressed, err := Compress(Gzip, data) + require.NoError(t, err) + assert.NotEqual(t, data, compressed, "Gzip should compress data") + assert.True(t, len(compressed) > 0, "Compressed data should not be empty") +} + +// TestCompress_Snappy tests snappy compression +func TestCompress_Snappy(t *testing.T) { + data := []byte("Hello, World! This is a test message for snappy compression.") + + compressed, err := Compress(Snappy, data) + require.NoError(t, err) + assert.NotEqual(t, data, compressed, "Snappy should compress data") + assert.True(t, len(compressed) > 0, "Compressed data should not be empty") +} + +// TestCompress_Lz4 tests lz4 compression +func TestCompress_Lz4(t *testing.T) { + data := []byte("Hello, World! This is a test message for lz4 compression.") + + compressed, err := Compress(Lz4, data) + require.NoError(t, err) + assert.NotEqual(t, data, compressed, "Lz4 should compress data") + assert.True(t, len(compressed) > 0, "Compressed data should not be empty") +} + +// TestCompress_Zstd tests zstd compression +func TestCompress_Zstd(t *testing.T) { + data := []byte("Hello, World! This is a test message for zstd compression.") + + compressed, err := Compress(Zstd, data) + require.NoError(t, err) + assert.NotEqual(t, data, compressed, "Zstd should compress data") + assert.True(t, len(compressed) > 0, "Compressed data should not be empty") +} + +// TestCompress_InvalidCodec tests compression with invalid codec +func TestCompress_InvalidCodec(t *testing.T) { + data := []byte("Hello, World!") + + _, err := Compress(CompressionCodec(99), data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unsupported compression codec") +} + +// TestDecompress_None tests decompression with None codec +func TestDecompress_None(t *testing.T) { + data := []byte("Hello, World!") + + decompressed, err := Decompress(None, data) + require.NoError(t, err) + assert.Equal(t, data, decompressed, "None codec should return original data") +} + +// TestRoundTrip tests compression and decompression round trip for all codecs +func TestRoundTrip(t *testing.T) { + testData := [][]byte{ + []byte("Hello, World!"), + []byte(""), + []byte("A"), + []byte(string(bytes.Repeat([]byte("Test data for compression round trip. "), 100))), + []byte("Special characters: àáâãäåæçèéêëìíîïðñòóôõö÷øùúûüýþÿ"), + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0xFF}, 256), // Binary data + } + + codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd} + + for _, codec := range codecs { + t.Run(codec.String(), func(t *testing.T) { + for i, data := range testData { + t.Run(fmt.Sprintf("data_%d", i), func(t *testing.T) { + // Compress + compressed, err := Compress(codec, data) + require.NoError(t, err, "Compression should succeed") + + // Decompress + decompressed, err := Decompress(codec, compressed) + require.NoError(t, err, "Decompression should succeed") + + // Verify round trip + assert.Equal(t, data, decompressed, "Round trip should preserve data") + }) + } + }) + } +} + +// TestDecompress_InvalidCodec tests decompression with invalid codec +func TestDecompress_InvalidCodec(t *testing.T) { + data := []byte("Hello, World!") + + _, err := Decompress(CompressionCodec(99), data) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unsupported compression codec") +} + +// TestDecompress_CorruptedData tests decompression with corrupted data +func TestDecompress_CorruptedData(t *testing.T) { + corruptedData := []byte("This is not compressed data") + + codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd} + + for _, codec := range codecs { + t.Run(codec.String(), func(t *testing.T) { + _, err := Decompress(codec, corruptedData) + assert.Error(t, err, "Decompression of corrupted data should fail") + }) + } +} + +// TestCompressRecordBatch tests record batch compression +func TestCompressRecordBatch(t *testing.T) { + recordsData := []byte("Record batch data for compression testing") + + t.Run("None codec", func(t *testing.T) { + compressed, attributes, err := CompressRecordBatch(None, recordsData) + require.NoError(t, err) + assert.Equal(t, recordsData, compressed) + assert.Equal(t, int16(0), attributes) + }) + + t.Run("Gzip codec", func(t *testing.T) { + compressed, attributes, err := CompressRecordBatch(Gzip, recordsData) + require.NoError(t, err) + assert.NotEqual(t, recordsData, compressed) + assert.Equal(t, int16(1), attributes) + }) + + t.Run("Snappy codec", func(t *testing.T) { + compressed, attributes, err := CompressRecordBatch(Snappy, recordsData) + require.NoError(t, err) + assert.NotEqual(t, recordsData, compressed) + assert.Equal(t, int16(2), attributes) + }) +} + +// TestDecompressRecordBatch tests record batch decompression +func TestDecompressRecordBatch(t *testing.T) { + recordsData := []byte("Record batch data for decompression testing") + + t.Run("None codec", func(t *testing.T) { + attributes := int16(0) // No compression + decompressed, err := DecompressRecordBatch(attributes, recordsData) + require.NoError(t, err) + assert.Equal(t, recordsData, decompressed) + }) + + t.Run("Round trip with Gzip", func(t *testing.T) { + // Compress + compressed, attributes, err := CompressRecordBatch(Gzip, recordsData) + require.NoError(t, err) + + // Decompress + decompressed, err := DecompressRecordBatch(attributes, compressed) + require.NoError(t, err) + assert.Equal(t, recordsData, decompressed) + }) + + t.Run("Round trip with Snappy", func(t *testing.T) { + // Compress + compressed, attributes, err := CompressRecordBatch(Snappy, recordsData) + require.NoError(t, err) + + // Decompress + decompressed, err := DecompressRecordBatch(attributes, compressed) + require.NoError(t, err) + assert.Equal(t, recordsData, decompressed) + }) +} + +// TestCompressionEfficiency tests compression efficiency for different codecs +func TestCompressionEfficiency(t *testing.T) { + // Create highly compressible data + data := bytes.Repeat([]byte("This is a repeated string for compression testing. "), 100) + + codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd} + + for _, codec := range codecs { + t.Run(codec.String(), func(t *testing.T) { + compressed, err := Compress(codec, data) + require.NoError(t, err) + + compressionRatio := float64(len(compressed)) / float64(len(data)) + t.Logf("Codec: %s, Original: %d bytes, Compressed: %d bytes, Ratio: %.2f", + codec.String(), len(data), len(compressed), compressionRatio) + + // All codecs should achieve some compression on this highly repetitive data + assert.Less(t, len(compressed), len(data), "Compression should reduce data size") + }) + } +} + +// BenchmarkCompression benchmarks compression performance for different codecs +func BenchmarkCompression(b *testing.B) { + data := bytes.Repeat([]byte("Benchmark data for compression testing. "), 1000) + codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd} + + for _, codec := range codecs { + b.Run(fmt.Sprintf("Compress_%s", codec.String()), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := Compress(codec, data) + if err != nil { + b.Fatal(err) + } + } + }) + } +} + +// BenchmarkDecompression benchmarks decompression performance for different codecs +func BenchmarkDecompression(b *testing.B) { + data := bytes.Repeat([]byte("Benchmark data for decompression testing. "), 1000) + codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd} + + for _, codec := range codecs { + // Pre-compress the data + compressed, err := Compress(codec, data) + if err != nil { + b.Fatal(err) + } + + b.Run(fmt.Sprintf("Decompress_%s", codec.String()), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := Decompress(codec, compressed) + if err != nil { + b.Fatal(err) + } + } + }) + } +} |
