aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/compression/compression.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/compression/compression.go')
-rw-r--r--weed/mq/kafka/compression/compression.go203
1 files changed, 203 insertions, 0 deletions
diff --git a/weed/mq/kafka/compression/compression.go b/weed/mq/kafka/compression/compression.go
new file mode 100644
index 000000000..f4c472199
--- /dev/null
+++ b/weed/mq/kafka/compression/compression.go
@@ -0,0 +1,203 @@
+package compression
+
+import (
+ "bytes"
+ "compress/gzip"
+ "fmt"
+ "io"
+
+ "github.com/golang/snappy"
+ "github.com/klauspost/compress/zstd"
+ "github.com/pierrec/lz4/v4"
+)
+
+// nopCloser wraps an io.Reader to provide a no-op Close method
+type nopCloser struct {
+ io.Reader
+}
+
+func (nopCloser) Close() error { return nil }
+
+// CompressionCodec represents the compression codec used in Kafka record batches
+type CompressionCodec int8
+
+const (
+ None CompressionCodec = 0
+ Gzip CompressionCodec = 1
+ Snappy CompressionCodec = 2
+ Lz4 CompressionCodec = 3
+ Zstd CompressionCodec = 4
+)
+
+// String returns the string representation of the compression codec
+func (c CompressionCodec) String() string {
+ switch c {
+ case None:
+ return "none"
+ case Gzip:
+ return "gzip"
+ case Snappy:
+ return "snappy"
+ case Lz4:
+ return "lz4"
+ case Zstd:
+ return "zstd"
+ default:
+ return fmt.Sprintf("unknown(%d)", c)
+ }
+}
+
+// IsValid returns true if the compression codec is valid
+func (c CompressionCodec) IsValid() bool {
+ return c >= None && c <= Zstd
+}
+
+// ExtractCompressionCodec extracts the compression codec from record batch attributes
+func ExtractCompressionCodec(attributes int16) CompressionCodec {
+ return CompressionCodec(attributes & 0x07) // Lower 3 bits
+}
+
+// SetCompressionCodec sets the compression codec in record batch attributes
+func SetCompressionCodec(attributes int16, codec CompressionCodec) int16 {
+ return (attributes &^ 0x07) | int16(codec)
+}
+
+// Compress compresses data using the specified codec
+func Compress(codec CompressionCodec, data []byte) ([]byte, error) {
+ if codec == None {
+ return data, nil
+ }
+
+ var buf bytes.Buffer
+ var writer io.WriteCloser
+ var err error
+
+ switch codec {
+ case Gzip:
+ writer = gzip.NewWriter(&buf)
+ case Snappy:
+ // Snappy doesn't have a streaming writer, so we compress directly
+ compressed := snappy.Encode(nil, data)
+ if compressed == nil {
+ compressed = []byte{}
+ }
+ return compressed, nil
+ case Lz4:
+ writer = lz4.NewWriter(&buf)
+ case Zstd:
+ writer, err = zstd.NewWriter(&buf)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create zstd writer: %w", err)
+ }
+ default:
+ return nil, fmt.Errorf("unsupported compression codec: %s", codec)
+ }
+
+ if _, err := writer.Write(data); err != nil {
+ writer.Close()
+ return nil, fmt.Errorf("failed to write compressed data: %w", err)
+ }
+
+ if err := writer.Close(); err != nil {
+ return nil, fmt.Errorf("failed to close compressor: %w", err)
+ }
+
+ return buf.Bytes(), nil
+}
+
+// Decompress decompresses data using the specified codec
+func Decompress(codec CompressionCodec, data []byte) ([]byte, error) {
+ if codec == None {
+ return data, nil
+ }
+
+ var reader io.ReadCloser
+ var err error
+
+ buf := bytes.NewReader(data)
+
+ switch codec {
+ case Gzip:
+ reader, err = gzip.NewReader(buf)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create gzip reader: %w", err)
+ }
+ case Snappy:
+ // Snappy doesn't have a streaming reader, so we decompress directly
+ decompressed, err := snappy.Decode(nil, data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decompress snappy data: %w", err)
+ }
+ if decompressed == nil {
+ decompressed = []byte{}
+ }
+ return decompressed, nil
+ case Lz4:
+ lz4Reader := lz4.NewReader(buf)
+ // lz4.Reader doesn't implement Close, so we wrap it
+ reader = &nopCloser{Reader: lz4Reader}
+ case Zstd:
+ zstdReader, err := zstd.NewReader(buf)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create zstd reader: %w", err)
+ }
+ defer zstdReader.Close()
+
+ var result bytes.Buffer
+ if _, err := io.Copy(&result, zstdReader); err != nil {
+ return nil, fmt.Errorf("failed to decompress zstd data: %w", err)
+ }
+ decompressed := result.Bytes()
+ if decompressed == nil {
+ decompressed = []byte{}
+ }
+ return decompressed, nil
+ default:
+ return nil, fmt.Errorf("unsupported compression codec: %s", codec)
+ }
+
+ defer reader.Close()
+
+ var result bytes.Buffer
+ if _, err := io.Copy(&result, reader); err != nil {
+ return nil, fmt.Errorf("failed to decompress data: %w", err)
+ }
+
+ decompressed := result.Bytes()
+ if decompressed == nil {
+ decompressed = []byte{}
+ }
+ return decompressed, nil
+}
+
+// CompressRecordBatch compresses the records portion of a Kafka record batch
+// This function compresses only the records data, not the entire batch header
+func CompressRecordBatch(codec CompressionCodec, recordsData []byte) ([]byte, int16, error) {
+ if codec == None {
+ return recordsData, 0, nil
+ }
+
+ compressed, err := Compress(codec, recordsData)
+ if err != nil {
+ return nil, 0, fmt.Errorf("failed to compress record batch: %w", err)
+ }
+
+ attributes := int16(codec)
+ return compressed, attributes, nil
+}
+
+// DecompressRecordBatch decompresses the records portion of a Kafka record batch
+func DecompressRecordBatch(attributes int16, compressedData []byte) ([]byte, error) {
+ codec := ExtractCompressionCodec(attributes)
+
+ if codec == None {
+ return compressedData, nil
+ }
+
+ decompressed, err := Decompress(codec, compressedData)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decompress record batch: %w", err)
+ }
+
+ return decompressed, nil
+}