diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_fetch.go | 6 | ||||
| -rw-r--r-- | weed/mq/kafka/gateway/coordinator_registry.go | 2 | ||||
| -rw-r--r-- | weed/mq/kafka/integration/broker_client_fetch.go | 4 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/fetch.go | 4 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_multibatch.go | 5 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_partition_reader.go | 16 | ||||
| -rw-r--r-- | weed/mq/kafka/protocol/produce.go | 6 | ||||
| -rw-r--r-- | weed/mq/offset/benchmark_test.go | 6 |
8 files changed, 4 insertions, 45 deletions
diff --git a/weed/mq/broker/broker_grpc_fetch.go b/weed/mq/broker/broker_grpc_fetch.go index 19024d852..4eb17d4fb 100644 --- a/weed/mq/broker/broker_grpc_fetch.go +++ b/weed/mq/broker/broker_grpc_fetch.go @@ -105,9 +105,6 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM requestedOffset := req.StartOffset // Read messages from LogBuffer (stateless read) - glog.Infof("[FetchMessage] About to read from LogBuffer: topic=%s partition=%v offset=%d maxMessages=%d maxBytes=%d", - t.Name, partition, requestedOffset, maxMessages, maxBytes) - logEntries, nextOffset, highWaterMark, endOfPartition, err := localPartition.LogBuffer.ReadMessagesAtOffset( requestedOffset, maxMessages, @@ -122,9 +119,6 @@ func (b *MessageQueueBroker) FetchMessage(ctx context.Context, req *mq_pb.FetchM nextOffset, endOfPartition, localPartition.LogBuffer.GetLogStartOffset()) } - glog.Infof("[FetchMessage] Read completed: topic=%s partition=%v offset=%d -> %d entries, nextOffset=%d, hwm=%d, eop=%v, err=%v", - t.Name, partition, requestedOffset, len(logEntries), nextOffset, highWaterMark, endOfPartition, err) - if err != nil { // Check if this is an "offset out of range" error errMsg := err.Error() diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go index af3330b03..eea1b1907 100644 --- a/weed/mq/kafka/gateway/coordinator_registry.go +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -80,7 +80,7 @@ func NewCoordinatorRegistry(gatewayAddress string, masters []pb.ServerAddress, g for _, master := range masters { // Use the same discovery logic as filer_discovery.go grpcAddr := master.ToGrpcAddress() - conn, err := grpc.Dial(grpcAddr, grpcDialOption) + conn, err := grpc.NewClient(grpcAddr, grpcDialOption) if err != nil { continue } diff --git a/weed/mq/kafka/integration/broker_client_fetch.go b/weed/mq/kafka/integration/broker_client_fetch.go index 25af9e809..016f8ccdf 100644 --- a/weed/mq/kafka/integration/broker_client_fetch.go +++ b/weed/mq/kafka/integration/broker_client_fetch.go @@ -80,10 +80,6 @@ func (bc *BrokerClient) FetchMessagesStateless(ctx context.Context, topic string } } - // CRITICAL DEBUGGING: Log what broker returned - glog.Infof("[FETCH-STATELESS-CLIENT] Broker response for %s[%d] offset %d: messages=%d, nextOffset=%d, hwm=%d, logStart=%d, endOfPartition=%v", - topic, partition, startOffset, len(resp.Messages), resp.NextOffset, resp.HighWaterMark, resp.LogStartOffset, resp.EndOfPartition) - // CRITICAL: If broker returns 0 messages but hwm > startOffset, something is wrong if len(resp.Messages) == 0 && resp.HighWaterMark > startOffset { glog.Errorf("[FETCH-STATELESS-CLIENT] CRITICAL BUG: Broker returned 0 messages for %s[%d] offset %d, but HWM=%d (should have %d messages available)", diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 6b38a71e1..58a96f5d8 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -181,7 +181,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } pending := make([]pendingFetch, 0) - persistentFetchStart := time.Now() // Phase 1: Dispatch all fetch requests to partition readers (non-blocking) for _, topic := range fetchRequest.Topics { @@ -285,8 +284,6 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } done: - _ = time.Since(persistentFetchStart) // persistentFetchDuration - // ==================================================================== // BUILD RESPONSE FROM FETCHED DATA // Now assemble the response in the correct order using fetched results @@ -1132,7 +1129,6 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return nil } - // For system topics like _schemas, _consumer_offsets, etc., // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) // and should NOT be processed as RecordValue protobuf messages. diff --git a/weed/mq/kafka/protocol/fetch_multibatch.go b/weed/mq/kafka/protocol/fetch_multibatch.go index 61cd19f78..192872850 100644 --- a/weed/mq/kafka/protocol/fetch_multibatch.go +++ b/weed/mq/kafka/protocol/fetch_multibatch.go @@ -8,7 +8,6 @@ import ( "fmt" "hash/crc32" "strings" - "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" @@ -61,7 +60,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName // Assume average message size + batch overhead // Client requested maxBytes, we should use most of it // Start with larger batches to maximize throughput - estimatedMsgSize := int32(1024) // Typical message size with overhead + estimatedMsgSize := int32(1024) // Typical message size with overhead recordsPerBatch := (maxBytes - 200) / estimatedMsgSize // Use available space efficiently if recordsPerBatch < 100 { recordsPerBatch = 100 // Minimum 100 records per batch @@ -116,9 +115,7 @@ func (f *MultiBatchFetcher) FetchMultipleBatches(ctx context.Context, topicName // Fetch records for this batch // Pass context to respect Kafka fetch request's MaxWaitTime - getRecordsStartTime := time.Now() smqRecords, err := f.handler.seaweedMQHandler.GetStoredRecords(ctx, topicName, partitionID, currentOffset, int(recordsToFetch)) - _ = time.Since(getRecordsStartTime) // getRecordsDuration if err != nil || len(smqRecords) == 0 { break diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 0117e3809..6583c6489 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -2,7 +2,6 @@ package protocol import ( "context" - "fmt" "sync" "time" @@ -120,21 +119,9 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition startTime := time.Now() result := &partitionFetchResult{} - // Log request START with full details - glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID) - defer func() { result.fetchDuration = time.Since(startTime) - // Log request END with results - resultStatus := "EMPTY" - if len(result.recordBatch) > 0 { - resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch)) - } - glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000) - // Send result back to client select { case req.resultChan <- result: @@ -189,9 +176,6 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) result.recordBatch = []byte{} } else { - // Log successful fetch with details - glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) result.recordBatch = recordBatch pr.bufferMu.Lock() pr.currentOffset = newOffset diff --git a/weed/mq/kafka/protocol/produce.go b/weed/mq/kafka/protocol/produce.go index ab1b1cb21..849d1148d 100644 --- a/weed/mq/kafka/protocol/produce.go +++ b/weed/mq/kafka/protocol/produce.go @@ -590,7 +590,6 @@ func decodeVarint(data []byte) (int64, int) { // handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+) func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { - startTime := time.Now() // For now, use simplified parsing similar to v0/v1 but handle v2+ response format // In v2+, the main differences are: @@ -731,7 +730,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(records) == 0 { errorCode = 42 // INVALID_RECORD } else { - var firstOffsetSet bool for idx, kv := range records { offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value) @@ -746,11 +744,8 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if idx == 0 { baseOffset = offsetProduced - firstOffsetSet = true } } - - _ = firstOffsetSet } } else { // Try to extract anyway - this might be a Noop record @@ -815,7 +810,6 @@ func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, if len(response) < 20 { } - _ = time.Since(startTime) // duration return response, nil } diff --git a/weed/mq/offset/benchmark_test.go b/weed/mq/offset/benchmark_test.go index d82729142..0fdacf127 100644 --- a/weed/mq/offset/benchmark_test.go +++ b/weed/mq/offset/benchmark_test.go @@ -227,7 +227,7 @@ func BenchmarkOffsetSubscription(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { subscriptionID := fmt.Sprintf("bench-sub-%d", i) - sub, err := subscriber.CreateSubscription( + _, err := subscriber.CreateSubscription( subscriptionID, "test-namespace", "test-topic", partition, @@ -238,7 +238,6 @@ func BenchmarkOffsetSubscription(b *testing.B) { b.Fatalf("Failed to create subscription: %v", err) } subscriber.CloseSubscription(subscriptionID) - _ = sub } }) @@ -338,7 +337,7 @@ func BenchmarkSMQOffsetIntegration(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { subscriptionID := fmt.Sprintf("integration-sub-%d", i) - sub, err := integration.CreateSubscription( + _, err := integration.CreateSubscription( subscriptionID, "test-namespace", "test-topic", partition, @@ -349,7 +348,6 @@ func BenchmarkSMQOffsetIntegration(b *testing.B) { b.Fatalf("Failed to create subscription: %v", err) } integration.CloseSubscription(subscriptionID) - _ = sub } }) |
