aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/compression
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/compression')
-rw-r--r--weed/mq/kafka/compression/compression.go203
-rw-r--r--weed/mq/kafka/compression/compression_test.go353
2 files changed, 556 insertions, 0 deletions
diff --git a/weed/mq/kafka/compression/compression.go b/weed/mq/kafka/compression/compression.go
new file mode 100644
index 000000000..f4c472199
--- /dev/null
+++ b/weed/mq/kafka/compression/compression.go
@@ -0,0 +1,203 @@
+package compression
+
+import (
+ "bytes"
+ "compress/gzip"
+ "fmt"
+ "io"
+
+ "github.com/golang/snappy"
+ "github.com/klauspost/compress/zstd"
+ "github.com/pierrec/lz4/v4"
+)
+
+// nopCloser wraps an io.Reader to provide a no-op Close method
+type nopCloser struct {
+ io.Reader
+}
+
+func (nopCloser) Close() error { return nil }
+
+// CompressionCodec represents the compression codec used in Kafka record batches
+type CompressionCodec int8
+
+const (
+ None CompressionCodec = 0
+ Gzip CompressionCodec = 1
+ Snappy CompressionCodec = 2
+ Lz4 CompressionCodec = 3
+ Zstd CompressionCodec = 4
+)
+
+// String returns the string representation of the compression codec
+func (c CompressionCodec) String() string {
+ switch c {
+ case None:
+ return "none"
+ case Gzip:
+ return "gzip"
+ case Snappy:
+ return "snappy"
+ case Lz4:
+ return "lz4"
+ case Zstd:
+ return "zstd"
+ default:
+ return fmt.Sprintf("unknown(%d)", c)
+ }
+}
+
+// IsValid returns true if the compression codec is valid
+func (c CompressionCodec) IsValid() bool {
+ return c >= None && c <= Zstd
+}
+
+// ExtractCompressionCodec extracts the compression codec from record batch attributes
+func ExtractCompressionCodec(attributes int16) CompressionCodec {
+ return CompressionCodec(attributes & 0x07) // Lower 3 bits
+}
+
+// SetCompressionCodec sets the compression codec in record batch attributes
+func SetCompressionCodec(attributes int16, codec CompressionCodec) int16 {
+ return (attributes &^ 0x07) | int16(codec)
+}
+
+// Compress compresses data using the specified codec
+func Compress(codec CompressionCodec, data []byte) ([]byte, error) {
+ if codec == None {
+ return data, nil
+ }
+
+ var buf bytes.Buffer
+ var writer io.WriteCloser
+ var err error
+
+ switch codec {
+ case Gzip:
+ writer = gzip.NewWriter(&buf)
+ case Snappy:
+ // Snappy doesn't have a streaming writer, so we compress directly
+ compressed := snappy.Encode(nil, data)
+ if compressed == nil {
+ compressed = []byte{}
+ }
+ return compressed, nil
+ case Lz4:
+ writer = lz4.NewWriter(&buf)
+ case Zstd:
+ writer, err = zstd.NewWriter(&buf)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create zstd writer: %w", err)
+ }
+ default:
+ return nil, fmt.Errorf("unsupported compression codec: %s", codec)
+ }
+
+ if _, err := writer.Write(data); err != nil {
+ writer.Close()
+ return nil, fmt.Errorf("failed to write compressed data: %w", err)
+ }
+
+ if err := writer.Close(); err != nil {
+ return nil, fmt.Errorf("failed to close compressor: %w", err)
+ }
+
+ return buf.Bytes(), nil
+}
+
+// Decompress decompresses data using the specified codec
+func Decompress(codec CompressionCodec, data []byte) ([]byte, error) {
+ if codec == None {
+ return data, nil
+ }
+
+ var reader io.ReadCloser
+ var err error
+
+ buf := bytes.NewReader(data)
+
+ switch codec {
+ case Gzip:
+ reader, err = gzip.NewReader(buf)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create gzip reader: %w", err)
+ }
+ case Snappy:
+ // Snappy doesn't have a streaming reader, so we decompress directly
+ decompressed, err := snappy.Decode(nil, data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decompress snappy data: %w", err)
+ }
+ if decompressed == nil {
+ decompressed = []byte{}
+ }
+ return decompressed, nil
+ case Lz4:
+ lz4Reader := lz4.NewReader(buf)
+ // lz4.Reader doesn't implement Close, so we wrap it
+ reader = &nopCloser{Reader: lz4Reader}
+ case Zstd:
+ zstdReader, err := zstd.NewReader(buf)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create zstd reader: %w", err)
+ }
+ defer zstdReader.Close()
+
+ var result bytes.Buffer
+ if _, err := io.Copy(&result, zstdReader); err != nil {
+ return nil, fmt.Errorf("failed to decompress zstd data: %w", err)
+ }
+ decompressed := result.Bytes()
+ if decompressed == nil {
+ decompressed = []byte{}
+ }
+ return decompressed, nil
+ default:
+ return nil, fmt.Errorf("unsupported compression codec: %s", codec)
+ }
+
+ defer reader.Close()
+
+ var result bytes.Buffer
+ if _, err := io.Copy(&result, reader); err != nil {
+ return nil, fmt.Errorf("failed to decompress data: %w", err)
+ }
+
+ decompressed := result.Bytes()
+ if decompressed == nil {
+ decompressed = []byte{}
+ }
+ return decompressed, nil
+}
+
+// CompressRecordBatch compresses the records portion of a Kafka record batch
+// This function compresses only the records data, not the entire batch header
+func CompressRecordBatch(codec CompressionCodec, recordsData []byte) ([]byte, int16, error) {
+ if codec == None {
+ return recordsData, 0, nil
+ }
+
+ compressed, err := Compress(codec, recordsData)
+ if err != nil {
+ return nil, 0, fmt.Errorf("failed to compress record batch: %w", err)
+ }
+
+ attributes := int16(codec)
+ return compressed, attributes, nil
+}
+
+// DecompressRecordBatch decompresses the records portion of a Kafka record batch
+func DecompressRecordBatch(attributes int16, compressedData []byte) ([]byte, error) {
+ codec := ExtractCompressionCodec(attributes)
+
+ if codec == None {
+ return compressedData, nil
+ }
+
+ decompressed, err := Decompress(codec, compressedData)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decompress record batch: %w", err)
+ }
+
+ return decompressed, nil
+}
diff --git a/weed/mq/kafka/compression/compression_test.go b/weed/mq/kafka/compression/compression_test.go
new file mode 100644
index 000000000..41fe82651
--- /dev/null
+++ b/weed/mq/kafka/compression/compression_test.go
@@ -0,0 +1,353 @@
+package compression
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// TestCompressionCodec_String tests the string representation of compression codecs
+func TestCompressionCodec_String(t *testing.T) {
+ tests := []struct {
+ codec CompressionCodec
+ expected string
+ }{
+ {None, "none"},
+ {Gzip, "gzip"},
+ {Snappy, "snappy"},
+ {Lz4, "lz4"},
+ {Zstd, "zstd"},
+ {CompressionCodec(99), "unknown(99)"},
+ }
+
+ for _, test := range tests {
+ t.Run(test.expected, func(t *testing.T) {
+ assert.Equal(t, test.expected, test.codec.String())
+ })
+ }
+}
+
+// TestCompressionCodec_IsValid tests codec validation
+func TestCompressionCodec_IsValid(t *testing.T) {
+ tests := []struct {
+ codec CompressionCodec
+ valid bool
+ }{
+ {None, true},
+ {Gzip, true},
+ {Snappy, true},
+ {Lz4, true},
+ {Zstd, true},
+ {CompressionCodec(-1), false},
+ {CompressionCodec(5), false},
+ {CompressionCodec(99), false},
+ }
+
+ for _, test := range tests {
+ t.Run(test.codec.String(), func(t *testing.T) {
+ assert.Equal(t, test.valid, test.codec.IsValid())
+ })
+ }
+}
+
+// TestExtractCompressionCodec tests extracting compression codec from attributes
+func TestExtractCompressionCodec(t *testing.T) {
+ tests := []struct {
+ name string
+ attributes int16
+ expected CompressionCodec
+ }{
+ {"None", 0x0000, None},
+ {"Gzip", 0x0001, Gzip},
+ {"Snappy", 0x0002, Snappy},
+ {"Lz4", 0x0003, Lz4},
+ {"Zstd", 0x0004, Zstd},
+ {"Gzip with transactional", 0x0011, Gzip}, // Bit 4 set (transactional)
+ {"Snappy with control", 0x0022, Snappy}, // Bit 5 set (control)
+ {"Lz4 with both flags", 0x0033, Lz4}, // Both flags set
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ codec := ExtractCompressionCodec(test.attributes)
+ assert.Equal(t, test.expected, codec)
+ })
+ }
+}
+
+// TestSetCompressionCodec tests setting compression codec in attributes
+func TestSetCompressionCodec(t *testing.T) {
+ tests := []struct {
+ name string
+ attributes int16
+ codec CompressionCodec
+ expected int16
+ }{
+ {"Set None", 0x0000, None, 0x0000},
+ {"Set Gzip", 0x0000, Gzip, 0x0001},
+ {"Set Snappy", 0x0000, Snappy, 0x0002},
+ {"Set Lz4", 0x0000, Lz4, 0x0003},
+ {"Set Zstd", 0x0000, Zstd, 0x0004},
+ {"Replace Gzip with Snappy", 0x0001, Snappy, 0x0002},
+ {"Set Gzip preserving transactional", 0x0010, Gzip, 0x0011},
+ {"Set Lz4 preserving control", 0x0020, Lz4, 0x0023},
+ {"Set Zstd preserving both flags", 0x0030, Zstd, 0x0034},
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ result := SetCompressionCodec(test.attributes, test.codec)
+ assert.Equal(t, test.expected, result)
+ })
+ }
+}
+
+// TestCompress_None tests compression with None codec
+func TestCompress_None(t *testing.T) {
+ data := []byte("Hello, World!")
+
+ compressed, err := Compress(None, data)
+ require.NoError(t, err)
+ assert.Equal(t, data, compressed, "None codec should return original data")
+}
+
+// TestCompress_Gzip tests gzip compression
+func TestCompress_Gzip(t *testing.T) {
+ data := []byte("Hello, World! This is a test message for gzip compression.")
+
+ compressed, err := Compress(Gzip, data)
+ require.NoError(t, err)
+ assert.NotEqual(t, data, compressed, "Gzip should compress data")
+ assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
+}
+
+// TestCompress_Snappy tests snappy compression
+func TestCompress_Snappy(t *testing.T) {
+ data := []byte("Hello, World! This is a test message for snappy compression.")
+
+ compressed, err := Compress(Snappy, data)
+ require.NoError(t, err)
+ assert.NotEqual(t, data, compressed, "Snappy should compress data")
+ assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
+}
+
+// TestCompress_Lz4 tests lz4 compression
+func TestCompress_Lz4(t *testing.T) {
+ data := []byte("Hello, World! This is a test message for lz4 compression.")
+
+ compressed, err := Compress(Lz4, data)
+ require.NoError(t, err)
+ assert.NotEqual(t, data, compressed, "Lz4 should compress data")
+ assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
+}
+
+// TestCompress_Zstd tests zstd compression
+func TestCompress_Zstd(t *testing.T) {
+ data := []byte("Hello, World! This is a test message for zstd compression.")
+
+ compressed, err := Compress(Zstd, data)
+ require.NoError(t, err)
+ assert.NotEqual(t, data, compressed, "Zstd should compress data")
+ assert.True(t, len(compressed) > 0, "Compressed data should not be empty")
+}
+
+// TestCompress_InvalidCodec tests compression with invalid codec
+func TestCompress_InvalidCodec(t *testing.T) {
+ data := []byte("Hello, World!")
+
+ _, err := Compress(CompressionCodec(99), data)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unsupported compression codec")
+}
+
+// TestDecompress_None tests decompression with None codec
+func TestDecompress_None(t *testing.T) {
+ data := []byte("Hello, World!")
+
+ decompressed, err := Decompress(None, data)
+ require.NoError(t, err)
+ assert.Equal(t, data, decompressed, "None codec should return original data")
+}
+
+// TestRoundTrip tests compression and decompression round trip for all codecs
+func TestRoundTrip(t *testing.T) {
+ testData := [][]byte{
+ []byte("Hello, World!"),
+ []byte(""),
+ []byte("A"),
+ []byte(string(bytes.Repeat([]byte("Test data for compression round trip. "), 100))),
+ []byte("Special characters: àáâãäåæçèéêëìíîïðñòóôõö÷øùúûüýþÿ"),
+ bytes.Repeat([]byte{0x00, 0x01, 0x02, 0xFF}, 256), // Binary data
+ }
+
+ codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
+
+ for _, codec := range codecs {
+ t.Run(codec.String(), func(t *testing.T) {
+ for i, data := range testData {
+ t.Run(fmt.Sprintf("data_%d", i), func(t *testing.T) {
+ // Compress
+ compressed, err := Compress(codec, data)
+ require.NoError(t, err, "Compression should succeed")
+
+ // Decompress
+ decompressed, err := Decompress(codec, compressed)
+ require.NoError(t, err, "Decompression should succeed")
+
+ // Verify round trip
+ assert.Equal(t, data, decompressed, "Round trip should preserve data")
+ })
+ }
+ })
+ }
+}
+
+// TestDecompress_InvalidCodec tests decompression with invalid codec
+func TestDecompress_InvalidCodec(t *testing.T) {
+ data := []byte("Hello, World!")
+
+ _, err := Decompress(CompressionCodec(99), data)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unsupported compression codec")
+}
+
+// TestDecompress_CorruptedData tests decompression with corrupted data
+func TestDecompress_CorruptedData(t *testing.T) {
+ corruptedData := []byte("This is not compressed data")
+
+ codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd}
+
+ for _, codec := range codecs {
+ t.Run(codec.String(), func(t *testing.T) {
+ _, err := Decompress(codec, corruptedData)
+ assert.Error(t, err, "Decompression of corrupted data should fail")
+ })
+ }
+}
+
+// TestCompressRecordBatch tests record batch compression
+func TestCompressRecordBatch(t *testing.T) {
+ recordsData := []byte("Record batch data for compression testing")
+
+ t.Run("None codec", func(t *testing.T) {
+ compressed, attributes, err := CompressRecordBatch(None, recordsData)
+ require.NoError(t, err)
+ assert.Equal(t, recordsData, compressed)
+ assert.Equal(t, int16(0), attributes)
+ })
+
+ t.Run("Gzip codec", func(t *testing.T) {
+ compressed, attributes, err := CompressRecordBatch(Gzip, recordsData)
+ require.NoError(t, err)
+ assert.NotEqual(t, recordsData, compressed)
+ assert.Equal(t, int16(1), attributes)
+ })
+
+ t.Run("Snappy codec", func(t *testing.T) {
+ compressed, attributes, err := CompressRecordBatch(Snappy, recordsData)
+ require.NoError(t, err)
+ assert.NotEqual(t, recordsData, compressed)
+ assert.Equal(t, int16(2), attributes)
+ })
+}
+
+// TestDecompressRecordBatch tests record batch decompression
+func TestDecompressRecordBatch(t *testing.T) {
+ recordsData := []byte("Record batch data for decompression testing")
+
+ t.Run("None codec", func(t *testing.T) {
+ attributes := int16(0) // No compression
+ decompressed, err := DecompressRecordBatch(attributes, recordsData)
+ require.NoError(t, err)
+ assert.Equal(t, recordsData, decompressed)
+ })
+
+ t.Run("Round trip with Gzip", func(t *testing.T) {
+ // Compress
+ compressed, attributes, err := CompressRecordBatch(Gzip, recordsData)
+ require.NoError(t, err)
+
+ // Decompress
+ decompressed, err := DecompressRecordBatch(attributes, compressed)
+ require.NoError(t, err)
+ assert.Equal(t, recordsData, decompressed)
+ })
+
+ t.Run("Round trip with Snappy", func(t *testing.T) {
+ // Compress
+ compressed, attributes, err := CompressRecordBatch(Snappy, recordsData)
+ require.NoError(t, err)
+
+ // Decompress
+ decompressed, err := DecompressRecordBatch(attributes, compressed)
+ require.NoError(t, err)
+ assert.Equal(t, recordsData, decompressed)
+ })
+}
+
+// TestCompressionEfficiency tests compression efficiency for different codecs
+func TestCompressionEfficiency(t *testing.T) {
+ // Create highly compressible data
+ data := bytes.Repeat([]byte("This is a repeated string for compression testing. "), 100)
+
+ codecs := []CompressionCodec{Gzip, Snappy, Lz4, Zstd}
+
+ for _, codec := range codecs {
+ t.Run(codec.String(), func(t *testing.T) {
+ compressed, err := Compress(codec, data)
+ require.NoError(t, err)
+
+ compressionRatio := float64(len(compressed)) / float64(len(data))
+ t.Logf("Codec: %s, Original: %d bytes, Compressed: %d bytes, Ratio: %.2f",
+ codec.String(), len(data), len(compressed), compressionRatio)
+
+ // All codecs should achieve some compression on this highly repetitive data
+ assert.Less(t, len(compressed), len(data), "Compression should reduce data size")
+ })
+ }
+}
+
+// BenchmarkCompression benchmarks compression performance for different codecs
+func BenchmarkCompression(b *testing.B) {
+ data := bytes.Repeat([]byte("Benchmark data for compression testing. "), 1000)
+ codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
+
+ for _, codec := range codecs {
+ b.Run(fmt.Sprintf("Compress_%s", codec.String()), func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := Compress(codec, data)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+ }
+}
+
+// BenchmarkDecompression benchmarks decompression performance for different codecs
+func BenchmarkDecompression(b *testing.B) {
+ data := bytes.Repeat([]byte("Benchmark data for decompression testing. "), 1000)
+ codecs := []CompressionCodec{None, Gzip, Snappy, Lz4, Zstd}
+
+ for _, codec := range codecs {
+ // Pre-compress the data
+ compressed, err := Compress(codec, data)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ b.Run(fmt.Sprintf("Decompress_%s", codec.String()), func(b *testing.B) {
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := Decompress(codec, compressed)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+ })
+ }
+}