aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/produce.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/produce.go')
-rw-r--r--weed/mq/kafka/protocol/produce.go6
1 files changed, 0 insertions, 6 deletions
diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go
index ab1b1cb21..849d1148d 100644
--- a/weed/mq/kafka/protocol/produce.go
+++ b/weed/mq/kafka/protocol/produce.go
@@ -590,7 +590,6 @@ func decodeVarint(data []byte) (int64, int) {
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
- startTime := time.Now()
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
// In v2+, the main differences are:
@@ -731,7 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if len(records) == 0 {
errorCode = 42 // INVALID_RECORD
} else {
- var firstOffsetSet bool
for idx, kv := range records {
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
@@ -746,11 +744,8 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if idx == 0 {
baseOffset = offsetProduced
- firstOffsetSet = true
}
}
-
- _ = firstOffsetSet
}
} else {
// Try to extract anyway - this might be a Noop record
@@ -815,7 +810,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if len(response) < 20 {
}
- _ = time.Since(startTime) // duration
return response, nil
}