aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/fetch_multibatch.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/fetch_multibatch.go')
-rw-r--r--weed/mq/kafka/protocol/fetch_multibatch.go92
1 files changed, 27 insertions, 65 deletions
diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go
index 2d157c75a..61cd19f78 100644
--- a/weed/mq/kafka/protocol/fetch_multibatch.go
+++ b/weed/mq/kafka/protocol/fetch_multibatch.go
@@ -57,9 +57,25 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName
totalSize := int32(0)
batchCount := 0
- // Parameters for batch fetching - start smaller to respect maxBytes better
- recordsPerBatch := int32(10) // Start with smaller batch size
- maxBatchesPerFetch := 10 // Limit number of batches to avoid infinite loops
+ // Estimate records per batch based on maxBytes available
+ // Assume average message size + batch overhead
+ // Client requested maxBytes, we should use most of it
+ // Start with larger batches to maximize throughput
+ estimatedMsgSize := int32(1024) // Typical message size with overhead
+ recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently
+ if recordsPerBatch < 100 {
+ recordsPerBatch = 100 // Minimum 100 records per batch
+ }
+ if recordsPerBatch > 10000 {
+ recordsPerBatch = 10000 // Cap at 10k records per batch to avoid huge memory allocations
+ }
+ maxBatchesPerFetch := int((maxBytes - 200) / (estimatedMsgSize * 10)) // Reasonable limit
+ if maxBatchesPerFetch < 5 {
+ maxBatchesPerFetch = 5 // At least 5 batches
+ }
+ if maxBatchesPerFetch > 100 {
+ maxBatchesPerFetch = 100 // At most 100 batches
+ }
for batchCount < maxBatchesPerFetch && currentOffset < highWaterMark {
@@ -70,8 +86,13 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName
}
// Adapt records per batch based on remaining space
- if remainingBytes < 1000 {
- recordsPerBatch = 10 // Smaller batches when space is limited
+ // If we have less space remaining, fetch fewer records to avoid going over
+ currentBatchSize := recordsPerBatch
+ if remainingBytes < recordsPerBatch*estimatedMsgSize {
+ currentBatchSize = remainingBytes / estimatedMsgSize
+ if currentBatchSize < 1 {
+ currentBatchSize = 1
+ }
}
// Calculate how many records to fetch for this batch
@@ -80,7 +101,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName
break
}
- recordsToFetch := recordsPerBatch
+ recordsToFetch := currentBatchSize
if int64(recordsToFetch) > recordsAvailable {
recordsToFetch = int32(recordsAvailable)
}
@@ -577,65 +598,6 @@ func (f *MultiBatchFetcher) constructCompressedRecordBatch(baseOffset int64, com
return batch
}
-// estimateBatchSize estimates the size of a record batch before constructing it
-func (f *MultiBatchFetcher) estimateBatchSize(smqRecords []integration.SMQRecord) int32 {
- if len(smqRecords) == 0 {
- return 61 // empty batch header size
- }
-
- // Record batch header: 61 bytes (base_offset + batch_length + leader_epoch + magic + crc + attributes +
- // last_offset_delta + first_ts + max_ts + producer_id + producer_epoch + base_seq + record_count)
- headerSize := int32(61)
-
- baseTs := smqRecords[0].GetTimestamp()
- recordsSize := int32(0)
- for i, rec := range smqRecords {
- // attributes(1)
- rb := int32(1)
-
- // timestamp_delta(varint)
- tsDelta := rec.GetTimestamp() - baseTs
- rb += int32(len(encodeVarint(tsDelta)))
-
- // offset_delta(varint)
- rb += int32(len(encodeVarint(int64(i))))
-
- // key length varint + data or -1
- if k := rec.GetKey(); k != nil {
- rb += int32(len(encodeVarint(int64(len(k))))) + int32(len(k))
- } else {
- rb += int32(len(encodeVarint(-1)))
- }
-
- // value length varint + data or -1
- if v := rec.GetValue(); v != nil {
- rb += int32(len(encodeVarint(int64(len(v))))) + int32(len(v))
- } else {
- rb += int32(len(encodeVarint(-1)))
- }
-
- // headers count (varint = 0)
- rb += int32(len(encodeVarint(0)))
-
- // prepend record length varint
- recordsSize += int32(len(encodeVarint(int64(rb)))) + rb
- }
-
- return headerSize + recordsSize
-}
-
-// sizeOfVarint returns the number of bytes encodeVarint would use for value
-func sizeOfVarint(value int64) int32 {
- // ZigZag encode to match encodeVarint
- u := uint64(uint64(value<<1) ^ uint64(value>>63))
- size := int32(1)
- for u >= 0x80 {
- u >>= 7
- size++
- }
- return size
-}
-
// compressData compresses data using the specified codec (basic implementation)
func (f *MultiBatchFetcher) compressData(data []byte, codec compression.CompressionCodec) ([]byte, error) {
// For Phase 5, implement basic compression support