aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/fetch_multibatch.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/fetch_multibatch.go')
-rw-r--r--weed/mq/kafka/protocol/fetch_multibatch.go665
1 files changed, 665 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go
new file mode 100644
index 000000000..2d157c75a
--- /dev/null
+++ b/weed/mq/kafka/protocol/fetch_multibatch.go
@@ -0,0 +1,665 @@
+package protocol
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "encoding/binary"
+ "fmt"
+ "hash/crc32"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
+)
+
+// MultiBatchFetcher handles fetching multiple record batches with size limits
+type MultiBatchFetcher struct {
+ handler *Handler
+}
+
+// NewMultiBatchFetcher creates a new multi-batch fetcher
+func NewMultiBatchFetcher(handler *Handler) *MultiBatchFetcher {
+ return &MultiBatchFetcher{handler: handler}
+}
+
+// FetchResult represents the result of a multi-batch fetch operation
+type FetchResult struct {
+ RecordBatches []byte // Concatenated record batches
+ NextOffset int64 // Next offset to fetch from
+ TotalSize int32 // Total size of all batches
+ BatchCount int // Number of batches included
+}
+
+// FetchMultipleBatches fetches multiple record batches up to maxBytes limit
+// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime)
+func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName string, partitionID int32, startOffset, highWaterMark int64, maxBytes int32) (*FetchResult, error) {
+
+ if startOffset >= highWaterMark {
+ return &FetchResult{
+ RecordBatches: []byte{},
+ NextOffset: startOffset,
+ TotalSize: 0,
+ BatchCount: 0,
+ }, nil
+ }
+
+ // Minimum size for basic response headers and one empty batch
+ minResponseSize := int32(200)
+ if maxBytes < minResponseSize {
+ maxBytes = minResponseSize
+ }
+
+ var combinedBatches []byte
+ currentOffset := startOffset
+ totalSize := int32(0)
+ batchCount := 0
+
+ // Parameters for batch fetching - start smaller to respect maxBytes better
+ recordsPerBatch := int32(10) // Start with smaller batch size
+ maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops
+
+ for batchCount < maxBatchesPerFetch && currentOffset < highWaterMark {
+
+ // Calculate remaining space
+ remainingBytes := maxBytes - totalSize
+ if remainingBytes < 100 { // Need at least 100 bytes for a minimal batch
+ break
+ }
+
+ // Adapt records per batch based on remaining space
+ if remainingBytes < 1000 {
+ recordsPerBatch = 10 // Smaller batches when space is limited
+ }
+
+ // Calculate how many records to fetch for this batch
+ recordsAvailable := highWaterMark - currentOffset
+ if recordsAvailable <= 0 {
+ break
+ }
+
+ recordsToFetch := recordsPerBatch
+ if int64(recordsToFetch) > recordsAvailable {
+ recordsToFetch = int32(recordsAvailable)
+ }
+
+ // Check if handler is nil
+ if f.handler == nil {
+ break
+ }
+ if f.handler.seaweedMQHandler == nil {
+ break
+ }
+
+ // Fetch records for this batch
+ // Pass context to respect Kafka fetch request's MaxWaitTime
+ getRecordsStartTime := time.Now()
+ smqRecords, err := f.handler.seaweedMQHandler.GetStoredRecords(ctx, topicName, partitionID, currentOffset, int(recordsToFetch))
+ _ = time.Since(getRecordsStartTime) // getRecordsDuration
+
+ if err != nil || len(smqRecords) == 0 {
+ break
+ }
+
+ // Note: we construct the batch and check actual size after construction
+
+ // Construct record batch
+ batch := f.constructSingleRecordBatch(topicName, currentOffset, smqRecords)
+ batchSize := int32(len(batch))
+
+ // Double-check actual size doesn't exceed maxBytes
+ if totalSize+batchSize > maxBytes && batchCount > 0 {
+ break
+ }
+
+ // Add this batch to combined result
+ combinedBatches = append(combinedBatches, batch...)
+ totalSize += batchSize
+ currentOffset += int64(len(smqRecords))
+ batchCount++
+
+ // If this is a small batch, we might be at the end
+ if len(smqRecords) < int(recordsPerBatch) {
+ break
+ }
+ }
+
+ result := &FetchResult{
+ RecordBatches: combinedBatches,
+ NextOffset: currentOffset,
+ TotalSize: totalSize,
+ BatchCount: batchCount,
+ }
+
+ return result, nil
+}
+
+// constructSingleRecordBatch creates a single record batch from SMQ records
+func (f *MultiBatchFetcher) constructSingleRecordBatch(topicName string, baseOffset int64, smqRecords []integration.SMQRecord) []byte {
+ if len(smqRecords) == 0 {
+ return f.constructEmptyRecordBatch(baseOffset)
+ }
+
+ // Create record batch using the SMQ records
+ batch := make([]byte, 0, 512)
+
+ // Record batch header
+ baseOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
+ batch = append(batch, baseOffsetBytes...) // base offset (8 bytes)
+
+ // Calculate batch length (will be filled after we know the size)
+ batchLengthPos := len(batch)
+ batch = append(batch, 0, 0, 0, 0) // batch length placeholder (4 bytes)
+
+ // Partition leader epoch (4 bytes) - use 0 (real Kafka uses 0, not -1)
+ batch = append(batch, 0x00, 0x00, 0x00, 0x00)
+
+ // Magic byte (1 byte) - v2 format
+ batch = append(batch, 2)
+
+ // CRC placeholder (4 bytes) - will be calculated later
+ crcPos := len(batch)
+ batch = append(batch, 0, 0, 0, 0)
+
+ // Attributes (2 bytes) - no compression, etc.
+ batch = append(batch, 0, 0)
+
+ // Last offset delta (4 bytes)
+ lastOffsetDelta := int32(len(smqRecords) - 1)
+ lastOffsetDeltaBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(lastOffsetDeltaBytes, uint32(lastOffsetDelta))
+ batch = append(batch, lastOffsetDeltaBytes...)
+
+ // Base timestamp (8 bytes) - convert from nanoseconds to milliseconds for Kafka compatibility
+ baseTimestamp := smqRecords[0].GetTimestamp() / 1000000 // Convert nanoseconds to milliseconds
+ baseTimestampBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(baseTimestampBytes, uint64(baseTimestamp))
+ batch = append(batch, baseTimestampBytes...)
+
+ // Max timestamp (8 bytes) - convert from nanoseconds to milliseconds for Kafka compatibility
+ maxTimestamp := baseTimestamp
+ if len(smqRecords) > 1 {
+ maxTimestamp = smqRecords[len(smqRecords)-1].GetTimestamp() / 1000000 // Convert nanoseconds to milliseconds
+ }
+ maxTimestampBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp))
+ batch = append(batch, maxTimestampBytes...)
+
+ // Producer ID (8 bytes) - use -1 for no producer ID
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Producer epoch (2 bytes) - use -1 for no producer epoch
+ batch = append(batch, 0xFF, 0xFF)
+
+ // Base sequence (4 bytes) - use -1 for no base sequence
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Records count (4 bytes)
+ recordCountBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(recordCountBytes, uint32(len(smqRecords)))
+ batch = append(batch, recordCountBytes...)
+
+ // Add individual records from SMQ records
+ for i, smqRecord := range smqRecords {
+ // Build individual record
+ recordBytes := make([]byte, 0, 128)
+
+ // Record attributes (1 byte)
+ recordBytes = append(recordBytes, 0)
+
+ // Timestamp delta (varint) - calculate from base timestamp (both in milliseconds)
+ recordTimestampMs := smqRecord.GetTimestamp() / 1000000 // Convert nanoseconds to milliseconds
+ timestampDelta := recordTimestampMs - baseTimestamp // Both in milliseconds now
+ recordBytes = append(recordBytes, encodeVarint(timestampDelta)...)
+
+ // Offset delta (varint)
+ offsetDelta := int64(i)
+ recordBytes = append(recordBytes, encodeVarint(offsetDelta)...)
+
+ // Key length and key (varint + data) - decode RecordValue to get original Kafka message
+ key := f.handler.decodeRecordValueToKafkaMessage(topicName, smqRecord.GetKey())
+ if key == nil {
+ recordBytes = append(recordBytes, encodeVarint(-1)...) // null key
+ } else {
+ recordBytes = append(recordBytes, encodeVarint(int64(len(key)))...)
+ recordBytes = append(recordBytes, key...)
+ }
+
+ // Value length and value (varint + data) - decode RecordValue to get original Kafka message
+ value := f.handler.decodeRecordValueToKafkaMessage(topicName, smqRecord.GetValue())
+
+ if value == nil {
+ recordBytes = append(recordBytes, encodeVarint(-1)...) // null value
+ } else {
+ recordBytes = append(recordBytes, encodeVarint(int64(len(value)))...)
+ recordBytes = append(recordBytes, value...)
+ }
+
+ // Headers count (varint) - 0 headers
+ recordBytes = append(recordBytes, encodeVarint(0)...)
+
+ // Prepend record length (varint)
+ recordLength := int64(len(recordBytes))
+ batch = append(batch, encodeVarint(recordLength)...)
+ batch = append(batch, recordBytes...)
+ }
+
+ // Fill in the batch length
+ batchLength := uint32(len(batch) - batchLengthPos - 4)
+ binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength)
+
+ // Debug: Log reconstructed batch (only at high verbosity)
+ if glog.V(4) {
+ fmt.Printf("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n")
+ fmt.Printf("📏 RECONSTRUCTED BATCH: topic=%s baseOffset=%d size=%d bytes, recordCount=%d\n",
+ topicName, baseOffset, len(batch), len(smqRecords))
+ }
+
+ if glog.V(4) && len(batch) >= 61 {
+ fmt.Printf(" Header Structure:\n")
+ fmt.Printf(" Base Offset (0-7): %x\n", batch[0:8])
+ fmt.Printf(" Batch Length (8-11): %x\n", batch[8:12])
+ fmt.Printf(" Leader Epoch (12-15): %x\n", batch[12:16])
+ fmt.Printf(" Magic (16): %x\n", batch[16:17])
+ fmt.Printf(" CRC (17-20): %x (WILL BE CALCULATED)\n", batch[17:21])
+ fmt.Printf(" Attributes (21-22): %x\n", batch[21:23])
+ fmt.Printf(" Last Offset Delta (23-26): %x\n", batch[23:27])
+ fmt.Printf(" Base Timestamp (27-34): %x\n", batch[27:35])
+ fmt.Printf(" Max Timestamp (35-42): %x\n", batch[35:43])
+ fmt.Printf(" Producer ID (43-50): %x\n", batch[43:51])
+ fmt.Printf(" Producer Epoch (51-52): %x\n", batch[51:53])
+ fmt.Printf(" Base Sequence (53-56): %x\n", batch[53:57])
+ fmt.Printf(" Record Count (57-60): %x\n", batch[57:61])
+ if len(batch) > 61 {
+ fmt.Printf(" Records Section (61+): %x... (%d bytes)\n",
+ batch[61:min(81, len(batch))], len(batch)-61)
+ }
+ }
+
+ // Calculate CRC32 for the batch
+ // Per Kafka spec: CRC covers ONLY from attributes offset (byte 21) onwards
+ // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...)
+ crcData := batch[crcPos+4:] // Skip CRC field itself, include rest
+ crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
+
+ // CRC debug (only at high verbosity)
+ if glog.V(4) {
+ batchLengthValue := binary.BigEndian.Uint32(batch[8:12])
+ expectedTotalSize := 12 + int(batchLengthValue)
+ actualTotalSize := len(batch)
+
+ fmt.Printf("\n === CRC CALCULATION DEBUG ===\n")
+ fmt.Printf(" Batch length field (bytes 8-11): %d\n", batchLengthValue)
+ fmt.Printf(" Expected total batch size: %d bytes (12 + %d)\n", expectedTotalSize, batchLengthValue)
+ fmt.Printf(" Actual batch size: %d bytes\n", actualTotalSize)
+ fmt.Printf(" CRC position: byte %d\n", crcPos)
+ fmt.Printf(" CRC data range: bytes %d to %d (%d bytes)\n", crcPos+4, actualTotalSize-1, len(crcData))
+
+ if expectedTotalSize != actualTotalSize {
+ fmt.Printf(" SIZE MISMATCH: %d bytes difference!\n", actualTotalSize-expectedTotalSize)
+ }
+
+ if crcPos != 17 {
+ fmt.Printf(" CRC POSITION WRONG: expected 17, got %d!\n", crcPos)
+ }
+
+ fmt.Printf(" CRC data (first 100 bytes of %d):\n", len(crcData))
+ dumpSize := 100
+ if len(crcData) < dumpSize {
+ dumpSize = len(crcData)
+ }
+ for i := 0; i < dumpSize; i += 20 {
+ end := i + 20
+ if end > dumpSize {
+ end = dumpSize
+ }
+ fmt.Printf(" [%3d-%3d]: %x\n", i, end-1, crcData[i:end])
+ }
+
+ manualCRC := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
+ fmt.Printf(" Calculated CRC: 0x%08x\n", crc)
+ fmt.Printf(" Manual verify: 0x%08x", manualCRC)
+ if crc == manualCRC {
+ fmt.Printf(" OK\n")
+ } else {
+ fmt.Printf(" MISMATCH!\n")
+ }
+
+ if actualTotalSize <= 200 {
+ fmt.Printf(" Complete batch hex dump (%d bytes):\n", actualTotalSize)
+ for i := 0; i < actualTotalSize; i += 16 {
+ end := i + 16
+ if end > actualTotalSize {
+ end = actualTotalSize
+ }
+ fmt.Printf(" %04d: %x\n", i, batch[i:end])
+ }
+ }
+ fmt.Printf(" === END CRC DEBUG ===\n\n")
+ }
+
+ binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
+
+ if glog.V(4) {
+ fmt.Printf(" Final CRC (17-20): %x (calculated over %d bytes)\n", batch[17:21], len(crcData))
+
+ // VERIFICATION: Read back what we just wrote
+ writtenCRC := binary.BigEndian.Uint32(batch[17:21])
+ fmt.Printf(" VERIFICATION: CRC we calculated=0x%x, CRC written to batch=0x%x", crc, writtenCRC)
+ if crc == writtenCRC {
+ fmt.Printf(" OK\n")
+ } else {
+ fmt.Printf(" MISMATCH!\n")
+ }
+
+ // DEBUG: Hash the entire batch to check if reconstructions are identical
+ batchHash := crc32.ChecksumIEEE(batch)
+ fmt.Printf(" BATCH IDENTITY: hash=0x%08x size=%d topic=%s baseOffset=%d recordCount=%d\n",
+ batchHash, len(batch), topicName, baseOffset, len(smqRecords))
+
+ // DEBUG: Show first few record keys/values to verify consistency
+ if len(smqRecords) > 0 && strings.Contains(topicName, "loadtest") {
+ fmt.Printf(" RECORD SAMPLES:\n")
+ for i := 0; i < min(3, len(smqRecords)); i++ {
+ keyPreview := smqRecords[i].GetKey()
+ if len(keyPreview) > 20 {
+ keyPreview = keyPreview[:20]
+ }
+ valuePreview := smqRecords[i].GetValue()
+ if len(valuePreview) > 40 {
+ valuePreview = valuePreview[:40]
+ }
+ fmt.Printf(" [%d] keyLen=%d valueLen=%d keyHex=%x valueHex=%x\n",
+ i, len(smqRecords[i].GetKey()), len(smqRecords[i].GetValue()),
+ keyPreview, valuePreview)
+ }
+ }
+
+ fmt.Printf(" Batch for topic=%s baseOffset=%d recordCount=%d\n", topicName, baseOffset, len(smqRecords))
+ fmt.Printf("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n")
+ }
+
+ return batch
+}
+
+// constructEmptyRecordBatch creates an empty record batch
+func (f *MultiBatchFetcher) constructEmptyRecordBatch(baseOffset int64) []byte {
+ // Create minimal empty record batch
+ batch := make([]byte, 0, 61)
+
+ // Base offset (8 bytes)
+ baseOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
+ batch = append(batch, baseOffsetBytes...)
+
+ // Batch length (4 bytes) - will be filled at the end
+ lengthPos := len(batch)
+ batch = append(batch, 0, 0, 0, 0)
+
+ // Partition leader epoch (4 bytes) - -1
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Magic byte (1 byte) - version 2
+ batch = append(batch, 2)
+
+ // CRC32 (4 bytes) - placeholder
+ crcPos := len(batch)
+ batch = append(batch, 0, 0, 0, 0)
+
+ // Attributes (2 bytes) - no compression, no transactional
+ batch = append(batch, 0, 0)
+
+ // Last offset delta (4 bytes) - -1 for empty batch
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Base timestamp (8 bytes)
+ timestamp := uint64(1640995200000) // Fixed timestamp for empty batches
+ timestampBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(timestampBytes, timestamp)
+ batch = append(batch, timestampBytes...)
+
+ // Max timestamp (8 bytes) - same as base for empty batch
+ batch = append(batch, timestampBytes...)
+
+ // Producer ID (8 bytes) - -1 for non-transactional
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Producer Epoch (2 bytes) - -1 for non-transactional
+ batch = append(batch, 0xFF, 0xFF)
+
+ // Base Sequence (4 bytes) - -1 for non-transactional
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Record count (4 bytes) - 0 for empty batch
+ batch = append(batch, 0, 0, 0, 0)
+
+ // Fill in the batch length
+ batchLength := len(batch) - 12 // Exclude base offset and length field itself
+ binary.BigEndian.PutUint32(batch[lengthPos:lengthPos+4], uint32(batchLength))
+
+ // Calculate CRC32 for the batch
+ // Per Kafka spec: CRC covers ONLY from attributes offset (byte 21) onwards
+ // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...)
+ crcData := batch[crcPos+4:] // Skip CRC field itself, include rest
+ crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
+ binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
+
+ return batch
+}
+
+// CompressedBatchResult represents a compressed record batch result
+type CompressedBatchResult struct {
+ CompressedData []byte
+ OriginalSize int32
+ CompressedSize int32
+ Codec compression.CompressionCodec
+}
+
+// CreateCompressedBatch creates a compressed record batch (basic support)
+func (f *MultiBatchFetcher) CreateCompressedBatch(baseOffset int64, smqRecords []integration.SMQRecord, codec compression.CompressionCodec) (*CompressedBatchResult, error) {
+ if codec == compression.None {
+ // No compression requested
+ batch := f.constructSingleRecordBatch("", baseOffset, smqRecords)
+ return &CompressedBatchResult{
+ CompressedData: batch,
+ OriginalSize: int32(len(batch)),
+ CompressedSize: int32(len(batch)),
+ Codec: compression.None,
+ }, nil
+ }
+
+ // For Phase 5, implement basic GZIP compression support
+ originalBatch := f.constructSingleRecordBatch("", baseOffset, smqRecords)
+ originalSize := int32(len(originalBatch))
+
+ compressedData, err := f.compressData(originalBatch, codec)
+ if err != nil {
+ // Fall back to uncompressed if compression fails
+ return &CompressedBatchResult{
+ CompressedData: originalBatch,
+ OriginalSize: originalSize,
+ CompressedSize: originalSize,
+ Codec: compression.None,
+ }, nil
+ }
+
+ // Create compressed record batch with proper headers
+ compressedBatch := f.constructCompressedRecordBatch(baseOffset, compressedData, codec, originalSize)
+
+ return &CompressedBatchResult{
+ CompressedData: compressedBatch,
+ OriginalSize: originalSize,
+ CompressedSize: int32(len(compressedBatch)),
+ Codec: codec,
+ }, nil
+}
+
+// constructCompressedRecordBatch creates a record batch with compressed records
+func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, compressedRecords []byte, codec compression.CompressionCodec, originalSize int32) []byte {
+ // Validate size to prevent overflow
+ const maxBatchSize = 1 << 30 // 1 GB limit
+ if len(compressedRecords) > maxBatchSize-100 {
+ glog.Errorf("Compressed records too large: %d bytes", len(compressedRecords))
+ return nil
+ }
+ batch := make([]byte, 0, len(compressedRecords)+100)
+
+ // Record batch header is similar to regular batch
+ baseOffsetBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
+ batch = append(batch, baseOffsetBytes...)
+
+ // Batch length (4 bytes) - will be filled later
+ batchLengthPos := len(batch)
+ batch = append(batch, 0, 0, 0, 0)
+
+ // Partition leader epoch (4 bytes)
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
+
+ // Magic byte (1 byte) - v2 format
+ batch = append(batch, 2)
+
+ // CRC placeholder (4 bytes)
+ crcPos := len(batch)
+ batch = append(batch, 0, 0, 0, 0)
+
+ // Attributes (2 bytes) - set compression bits
+ var compressionBits uint16
+ switch codec {
+ case compression.Gzip:
+ compressionBits = 1
+ case compression.Snappy:
+ compressionBits = 2
+ case compression.Lz4:
+ compressionBits = 3
+ case compression.Zstd:
+ compressionBits = 4
+ default:
+ compressionBits = 0 // no compression
+ }
+ batch = append(batch, byte(compressionBits>>8), byte(compressionBits))
+
+ // Last offset delta (4 bytes) - for compressed batches, this represents the logical record count
+ batch = append(batch, 0, 0, 0, 0) // Will be set based on logical records
+
+ // Timestamps (16 bytes) - use current time for compressed batches
+ timestamp := uint64(1640995200000)
+ timestampBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(timestampBytes, timestamp)
+ batch = append(batch, timestampBytes...) // first timestamp
+ batch = append(batch, timestampBytes...) // max timestamp
+
+ // Producer fields (14 bytes total)
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // producer ID
+ batch = append(batch, 0xFF, 0xFF) // producer epoch
+ batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) // base sequence
+
+ // Record count (4 bytes) - for compressed batches, this is the number of logical records
+ batch = append(batch, 0, 0, 0, 1) // Placeholder: treat as 1 logical record
+
+ // Compressed records data
+ batch = append(batch, compressedRecords...)
+
+ // Fill in the batch length
+ batchLength := uint32(len(batch) - batchLengthPos - 4)
+ binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength)
+
+ // Calculate CRC32 for the batch
+ // Per Kafka spec: CRC covers ONLY from attributes offset (byte 21) onwards
+ // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...)
+ crcData := batch[crcPos+4:] // Skip CRC field itself, include rest
+ crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
+ binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
+
+ return batch
+}
+
+// estimateBatchSize estimates the size of a record batch before constructing it
+func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []integration.SMQRecord) int32 {
+ if len(smqRecords) == 0 {
+ return 61 // empty batch header size
+ }
+
+ // Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes +
+ // last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count)
+ headerSize := int32(61)
+
+ baseTs := smqRecords[0].GetTimestamp()
+ recordsSize := int32(0)
+ for i, rec := range smqRecords {
+ // attributes(1)
+ rb := int32(1)
+
+ // timestamp_delta(varint)
+ tsDelta := rec.GetTimestamp() - baseTs
+ rb += int32(len(encodeVarint(tsDelta)))
+
+ // offset_delta(varint)
+ rb += int32(len(encodeVarint(int64(i))))
+
+ // key length varint + data or -1
+ if k := rec.GetKey(); k != nil {
+ rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k))
+ } else {
+ rb += int32(len(encodeVarint(-1)))
+ }
+
+ // value length varint + data or -1
+ if v := rec.GetValue(); v != nil {
+ rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v))
+ } else {
+ rb += int32(len(encodeVarint(-1)))
+ }
+
+ // headers count (varint = 0)
+ rb += int32(len(encodeVarint(0)))
+
+ // prepend record length varint
+ recordsSize += int32(len(encodeVarint(int64(rb)))) + rb
+ }
+
+ return headerSize + recordsSize
+}
+
+// sizeOfVarint returns the number of bytes encodeVarint would use for value
+func sizeOfVarint(value int64) int32 {
+ // ZigZag encode to match encodeVarint
+ u := uint64(uint64(value<<1) ^ uint64(value>>63))
+ size := int32(1)
+ for u >= 0x80 {
+ u >>= 7
+ size++
+ }
+ return size
+}
+
+// compressData compresses data using the specified codec (basic implementation)
+func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) {
+ // For Phase 5, implement basic compression support
+ switch codec {
+ case compression.None:
+ return data, nil
+ case compression.Gzip:
+ // Implement actual GZIP compression
+ var buf bytes.Buffer
+ gzipWriter := gzip.NewWriter(&buf)
+
+ if _, err := gzipWriter.Write(data); err != nil {
+ gzipWriter.Close()
+ return nil, fmt.Errorf("gzip compression write failed: %w", err)
+ }
+
+ if err := gzipWriter.Close(); err != nil {
+ return nil, fmt.Errorf("gzip compression close failed: %w", err)
+ }
+
+ compressed := buf.Bytes()
+
+ return compressed, nil
+ default:
+ return nil, fmt.Errorf("unsupported compression codec: %d", codec)
+ }
+}