aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka')
-rw-r--r--weed/mq/kafka/gateway/coordinator_registry.go2
-rw-r--r--weed/mq/kafka/integration/broker_client_fetch.go4
-rw-r--r--weed/mq/kafka/protocol/fetch.go4
-rw-r--r--weed/mq/kafka/protocol/fetch_multibatch.go5
-rw-r--r--weed/mq/kafka/protocol/fetch_partition_reader.go16
-rw-r--r--weed/mq/kafka/protocol/produce.go6
6 files changed, 2 insertions, 35 deletions
diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go
index af3330b03..eea1b1907 100644
--- a/weed/mq/kafka/gateway/coordinator_registry.go
+++ b/weed/mq/kafka/gateway/coordinator_registry.go
@@ -80,7 +80,7 @@ func NewCoordinatorRegistry(gatewayAddress string, masters []pb.ServerAddress, g
for _, master := range masters {
// Use the same discovery logic as filer_discovery.go
grpcAddr := master.ToGrpcAddress()
- conn, err := grpc.Dial(grpcAddr, grpcDialOption)
+ conn, err := grpc.NewClient(grpcAddr, grpcDialOption)
if err != nil {
continue
}
diff --git a/weed/mq/kafka/integration/broker_client_fetch.go b/weed/mq/kafka/integration/broker_client_fetch.go
index 25af9e809..016f8ccdf 100644
--- a/weed/mq/kafka/integration/broker_client_fetch.go
+++ b/weed/mq/kafka/integration/broker_client_fetch.go
@@ -80,10 +80,6 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string
}
}
- // CRITICAL DEBUGGING: Log what broker returned
- glog.Infof("[FETCH-STATELESS-CLIENT] Broker response for %s[%d] offset %d: messages=%d, nextOffset=%d, hwm=%d, logStart=%d, endOfPartition=%v",
- topic, partition, startOffset, len(resp.Messages), resp.NextOffset, resp.HighWaterMark, resp.LogStartOffset, resp.EndOfPartition)
-
// CRITICAL: If broker returns 0 messages but hwm > startOffset, something is wrong
if len(resp.Messages) == 0 && resp.HighWaterMark > startOffset {
glog.Errorf("[FETCH-STATELESS-CLIENT] CRITICAL BUG: Broker returned 0 messages for %s[%d] offset %d, but HWM=%d (should have %d messages available)",
diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go
index 6b38a71e1..58a96f5d8 100644
--- a/weed/mq/kafka/protocol/fetch.go
+++ b/weed/mq/kafka/protocol/fetch.go
@@ -181,7 +181,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}
pending := make([]pendingFetch, 0)
- persistentFetchStart := time.Now()
// Phase 1: Dispatch all fetch requests to partition readers (non-blocking)
for _, topic := range fetchRequest.Topics {
@@ -285,8 +284,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}
done:
- _ = time.Since(persistentFetchStart) // persistentFetchDuration
-
// ====================================================================
// BUILD RESPONSE FROM FETCHED DATA
// Now assemble the response in the correct order using fetched results
@@ -1132,7 +1129,6 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return nil
}
-
// For system topics like _schemas, _consumer_offsets, etc.,
// return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.)
// and should NOT be processed as RecordValue protobuf messages.
diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go
index 61cd19f78..192872850 100644
--- a/weed/mq/kafka/protocol/fetch_multibatch.go
+++ b/weed/mq/kafka/protocol/fetch_multibatch.go
@@ -8,7 +8,6 @@ import (
"fmt"
"hash/crc32"
"strings"
- "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
@@ -61,7 +60,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName
// Assume average message size + batch overhead
// Client requested maxBytes, we should use most of it
// Start with larger batches to maximize throughput
- estimatedMsgSize := int32(1024) // Typical message size with overhead
+ estimatedMsgSize := int32(1024) // Typical message size with overhead
recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently
if recordsPerBatch < 100 {
recordsPerBatch = 100 // Minimum 100 records per batch
@@ -116,9 +115,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName
// Fetch records for this batch
// Pass context to respect Kafka fetch request's MaxWaitTime
- getRecordsStartTime := time.Now()
smqRecords, err := f.handler.seaweedMQHandler.GetStoredRecords(ctx, topicName, partitionID, currentOffset, int(recordsToFetch))
- _ = time.Since(getRecordsStartTime) // getRecordsDuration
if err != nil || len(smqRecords) == 0 {
break
diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go
index 0117e3809..6583c6489 100644
--- a/weed/mq/kafka/protocol/fetch_partition_reader.go
+++ b/weed/mq/kafka/protocol/fetch_partition_reader.go
@@ -2,7 +2,6 @@ package protocol
import (
"context"
- "fmt"
"sync"
"time"
@@ -120,21 +119,9 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
startTime := time.Now()
result := &partitionFetchResult{}
- // Log request START with full details
- glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID)
-
defer func() {
result.fetchDuration = time.Since(startTime)
- // Log request END with results
- resultStatus := "EMPTY"
- if len(result.recordBatch) > 0 {
- resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch))
- }
- glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000)
-
// Send result back to client
select {
case req.resultChan <- result:
@@ -189,9 +176,6 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm)
result.recordBatch = []byte{}
} else {
- // Log successful fetch with details
- glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch))
result.recordBatch = recordBatch
pr.bufferMu.Lock()
pr.currentOffset = newOffset
diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go
index ab1b1cb21..849d1148d 100644
--- a/weed/mq/kafka/protocol/produce.go
+++ b/weed/mq/kafka/protocol/produce.go
@@ -590,7 +590,6 @@ func decodeVarint(data []byte) (int64, int) {
// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
- startTime := time.Now()
// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
// In v2+, the main differences are:
@@ -731,7 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if len(records) == 0 {
errorCode = 42 // INVALID_RECORD
} else {
- var firstOffsetSet bool
for idx, kv := range records {
offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
@@ -746,11 +744,8 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if idx == 0 {
baseOffset = offsetProduced
- firstOffsetSet = true
}
}
-
- _ = firstOffsetSet
}
} else {
// Try to extract anyway - this might be a Noop record
@@ -815,7 +810,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32,
if len(response) < 20 {
}
- _ = time.Since(startTime) // duration
return response, nil
}