diff options
Diffstat (limited to 'weed/mq/kafka/protocol/fetch_multibatch.go')
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_multibatch.go | 92 |
1 files changed, 27 insertions, 65 deletions
diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index 2d157c75a..61cd19f78 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -57,9 +57,25 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName totalSize := int32(0) batchCount := 0 - // Parameters for batch fetching - start smaller to respect maxBytes better - recordsPerBatch := int32(10) // Start with smaller batch size - maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops + // Estimate records per batch based on maxBytes available + // Assume average message size + batch overhead + // Client requested maxBytes, we should use most of it + // Start with larger batches to maximize throughput + estimatedMsgSize := int32(1024) // Typical message size with overhead + recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently + if recordsPerBatch < 100 { + recordsPerBatch = 100 // Minimum 100 records per batch + } + if recordsPerBatch > 10000 { + recordsPerBatch = 10000 // Cap at 10k records per batch to avoid huge memory allocations + } + maxBatchesPerFetch := int((maxBytes - 200) / (estimatedMsgSize * 10)) // Reasonable limit + if maxBatchesPerFetch < 5 { + maxBatchesPerFetch = 5 // At least 5 batches + } + if maxBatchesPerFetch > 100 { + maxBatchesPerFetch = 100 // At most 100 batches + } for batchCount < maxBatchesPerFetch && currentOffset < highWaterMark { @@ -70,8 +86,13 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName } // Adapt records per batch based on remaining space - if remainingBytes < 1000 { - recordsPerBatch = 10 // Smaller batches when space is limited + // If we have less space remaining, fetch fewer records to avoid going over + currentBatchSize := recordsPerBatch + if remainingBytes < recordsPerBatch*estimatedMsgSize { + currentBatchSize = remainingBytes / estimatedMsgSize + if currentBatchSize < 1 { + currentBatchSize = 1 + } } // Calculate how many records to fetch for this batch @@ -80,7 +101,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName break } - recordsToFetch := recordsPerBatch + recordsToFetch := currentBatchSize if int64(recordsToFetch) > recordsAvailable { recordsToFetch = int32(recordsAvailable) } @@ -577,65 +598,6 @@ func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, com return batch } -// estimateBatchSize estimates the size of a record batch before constructing it -func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []integration.SMQRecord) int32 { - if len(smqRecords) == 0 { - return 61 // empty batch header size - } - - // Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes + - // last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count) - headerSize := int32(61) - - baseTs := smqRecords[0].GetTimestamp() - recordsSize := int32(0) - for i, rec := range smqRecords { - // attributes(1) - rb := int32(1) - - // timestamp_delta(varint) - tsDelta := rec.GetTimestamp() - baseTs - rb += int32(len(encodeVarint(tsDelta))) - - // offset_delta(varint) - rb += int32(len(encodeVarint(int64(i)))) - - // key length varint + data or -1 - if k := rec.GetKey(); k != nil { - rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k)) - } else { - rb += int32(len(encodeVarint(-1))) - } - - // value length varint + data or -1 - if v := rec.GetValue(); v != nil { - rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v)) - } else { - rb += int32(len(encodeVarint(-1))) - } - - // headers count (varint = 0) - rb += int32(len(encodeVarint(0))) - - // prepend record length varint - recordsSize += int32(len(encodeVarint(int64(rb)))) + rb - } - - return headerSize + recordsSize -} - -// sizeOfVarint returns the number of bytes encodeVarint would use for value -func sizeOfVarint(value int64) int32 { - // ZigZag encode to match encodeVarint - u := uint64(uint64(value<<1) ^ uint64(value>>63)) - size := int32(1) - for u >= 0x80 { - u >>= 7 - size++ - } - return size -} - // compressData compresses data using the specified codec (basic implementation) func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) { // For Phase 5, implement basic compression support |
