diff options
Diffstat (limited to 'weed/mq/kafka/protocol')
| -rw-r--r-- | weed/mq/kafka/protocol/fetch.go | 4 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_multibatch.go | 5 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_partition_reader.go | 16 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/produce.go | 6 |
4 files changed, 1 insertions, 30 deletions
diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 6b38a71e1..58a96f5d8 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -181,7 +181,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } pending := make([]pendingFetch, 0) - persistentFetchStart := time.Now() // Phase 1: Dispatch all fetch requests to partition readers (non-blocking) for _, topic := range fetchRequest.Topics { @@ -285,8 +284,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } done: - _ = time.Since(persistentFetchStart) // persistentFetchDuration - // ==================================================================== // BUILD RESPONSE FROM FETCHED DATA // Now assemble the response in the correct order using fetched results @@ -1132,7 +1129,6 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return nil } - // For system topics like _schemas, _consumer_offsets, etc., // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) // and should NOT be processed as RecordValue protobuf messages. diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index 61cd19f78..192872850 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -8,7 +8,6 @@ import ( "fmt" "hash/crc32" "strings" - "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" @@ -61,7 +60,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName // Assume average message size + batch overhead // Client requested maxBytes, we should use most of it // Start with larger batches to maximize throughput - estimatedMsgSize := int32(1024) // Typical message size with overhead + estimatedMsgSize := int32(1024) // Typical message size with overhead recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently if recordsPerBatch < 100 { recordsPerBatch = 100 // Minimum 100 records per batch @@ -116,9 +115,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName // Fetch records for this batch // Pass context to respect Kafka fetch request's MaxWaitTime - getRecordsStartTime := time.Now() smqRecords, err := f.handler.seaweedMQHandler.GetStoredRecords(ctx, topicName, partitionID, currentOffset, int(recordsToFetch)) - _ = time.Since(getRecordsStartTime) // getRecordsDuration if err != nil || len(smqRecords) == 0 { break diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 0117e3809..6583c6489 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -2,7 +2,6 @@ package protocol import ( "context" - "fmt" "sync" "time" @@ -120,21 +119,9 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition startTime := time.Now() result := &partitionFetchResult{} - // Log request START with full details - glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID) - defer func() { result.fetchDuration = time.Since(startTime) - // Log request END with results - resultStatus := "EMPTY" - if len(result.recordBatch) > 0 { - resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch)) - } - glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000) - // Send result back to client select { case req.resultChan <- result: @@ -189,9 +176,6 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) result.recordBatch = []byte{} } else { - // Log successful fetch with details - glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) result.recordBatch = recordBatch pr.bufferMu.Lock() pr.currentOffset = newOffset diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index ab1b1cb21..849d1148d 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -590,7 +590,6 @@ func decodeVarint(data []byte) (int64, int) { // handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+) func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { - startTime := time.Now() // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: @@ -731,7 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(records) == 0 { errorCode = 42 // INVALID_RECORD } else { - var firstOffsetSet bool for idx, kv := range records { offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) @@ -746,11 +744,8 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if idx == 0 { baseOffset = offsetProduced - firstOffsetSet = true } } - - _ = firstOffsetSet } } else { // Try to extract anyway - this might be a Noop record @@ -815,7 +810,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(response) < 20 { } - _ = time.Since(startTime) // duration return response, nil } |
