aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/fetch_partition_reader.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/fetch_partition_reader.go')
-rw-r--r--weed/mq/kafka/protocol/fetch_partition_reader.go118
1 files changed, 91 insertions, 27 deletions
diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go
index 520b524cb..0117e3809 100644
--- a/weed/mq/kafka/protocol/fetch_partition_reader.go
+++ b/weed/mq/kafka/protocol/fetch_partition_reader.go
@@ -2,6 +2,7 @@ package protocol
import (
"context"
+ "fmt"
"sync"
"time"
@@ -42,6 +43,7 @@ type partitionFetchRequest struct {
resultChan chan *partitionFetchResult
isSchematized bool
apiVersion uint16
+ correlationID int32 // Added for correlation tracking
}
// newPartitionReader creates and starts a new partition reader with pre-fetch buffering
@@ -63,7 +65,7 @@ func newPartitionReader(ctx context.Context, handler *Handler, connCtx *Connecti
// Start the request handler goroutine
go pr.handleRequests(ctx)
- glog.V(2).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)",
+ glog.V(4).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)",
connCtx.ConnectionID, topicName, partitionID, startOffset)
return pr
@@ -75,7 +77,7 @@ func newPartitionReader(ctx context.Context, handler *Handler, connCtx *Connecti
// on-demand in serveFetchRequest instead.
func (pr *partitionReader) preFetchLoop(ctx context.Context) {
defer func() {
- glog.V(2).Infof("[%s] Pre-fetch loop exiting for %s[%d]",
+ glog.V(4).Infof("[%s] Pre-fetch loop exiting for %s[%d]",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
close(pr.recordBuffer)
}()
@@ -90,13 +92,13 @@ func (pr *partitionReader) preFetchLoop(ctx context.Context) {
}
// handleRequests serves fetch requests SEQUENTIALLY to prevent subscriber storm
-// CRITICAL: Sequential processing is essential for SMQ backend because:
+// Sequential processing is essential for SMQ backend because:
// 1. GetStoredRecords may create a new subscriber on each call
// 2. Concurrent calls create multiple subscribers for the same partition
// 3. This overwhelms the broker and causes partition shutdowns
func (pr *partitionReader) handleRequests(ctx context.Context) {
defer func() {
- glog.V(2).Infof("[%s] Request handler exiting for %s[%d]",
+ glog.V(4).Infof("[%s] Request handler exiting for %s[%d]",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
}()
@@ -117,13 +119,31 @@ func (pr *partitionReader) handleRequests(ctx context.Context) {
func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) {
startTime := time.Now()
result := &partitionFetchResult{}
+
+ // Log request START with full details
+ glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID)
+
defer func() {
result.fetchDuration = time.Since(startTime)
+
+ // Log request END with results
+ resultStatus := "EMPTY"
+ if len(result.recordBatch) > 0 {
+ resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch))
+ }
+ glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000)
+
+ // Send result back to client
select {
case req.resultChan <- result:
+ // Successfully sent
case <-ctx.Done():
+ glog.Warningf("[%s] Context cancelled while sending result for %s[%d]",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
case <-time.After(50 * time.Millisecond):
- glog.Warningf("[%s] Timeout sending result for %s[%d]",
+ glog.Warningf("[%s] Timeout sending result for %s[%d] - CLIENT MAY HAVE DISCONNECTED",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
}
}()
@@ -131,60 +151,76 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
// Get high water mark
hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID)
if hwmErr != nil {
- glog.Warningf("[%s] Failed to get high water mark for %s[%d]: %v",
+ glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr)
result.recordBatch = []byte{}
+ result.highWaterMark = 0
return
}
result.highWaterMark = hwm
- // CRITICAL: If requested offset >= HWM, return immediately with empty result
+ glog.V(2).Infof("[%s] HWM for %s[%d]: %d (requested: %d)",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwm, req.requestedOffset)
+
+ // If requested offset >= HWM, return immediately with empty result
// This prevents overwhelming the broker with futile read attempts when no data is available
if req.requestedOffset >= hwm {
result.recordBatch = []byte{}
- glog.V(3).Infof("[%s] No data available for %s[%d]: offset=%d >= hwm=%d",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm)
+ glog.V(3).Infof("[%s] Requested offset %d >= HWM %d, returning empty",
+ pr.connCtx.ConnectionID, req.requestedOffset, hwm)
return
}
// Update tracking offset to match requested offset
pr.bufferMu.Lock()
if req.requestedOffset != pr.currentOffset {
- glog.V(2).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, pr.currentOffset)
+ glog.V(3).Infof("[%s] Updating currentOffset for %s[%d]: %d -> %d",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, pr.currentOffset, req.requestedOffset)
pr.currentOffset = req.requestedOffset
}
pr.bufferMu.Unlock()
// Fetch on-demand - no pre-fetching to avoid overwhelming the broker
- // Pass the requested offset and maxWaitMs directly to avoid race conditions
recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm)
- if len(recordBatch) > 0 && newOffset > pr.currentOffset {
+
+ // Log what we got back - DETAILED for diagnostics
+ if len(recordBatch) == 0 {
+ glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm)
+ result.recordBatch = []byte{}
+ } else {
+ // Log successful fetch with details
+ glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch))
result.recordBatch = recordBatch
pr.bufferMu.Lock()
pr.currentOffset = newOffset
pr.bufferMu.Unlock()
- glog.V(2).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID,
- req.requestedOffset, newOffset, len(recordBatch))
- } else {
- result.recordBatch = []byte{}
}
}
// readRecords reads records forward using the multi-batch fetcher
func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, maxBytes int32, maxWaitMs int32, highWaterMark int64) ([]byte, int64) {
+ fetchStartTime := time.Now()
+
// Create context with timeout based on Kafka fetch request's MaxWaitTime
// This ensures we wait exactly as long as the client requested
fetchCtx := ctx
if maxWaitMs > 0 {
var cancel context.CancelFunc
- fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(maxWaitMs)*time.Millisecond)
+ // Use 1.5x the client timeout to account for internal processing overhead
+ // This prevents legitimate slow reads from being killed by client timeout
+ internalTimeoutMs := int32(float64(maxWaitMs) * 1.5)
+ if internalTimeoutMs > 5000 {
+ internalTimeoutMs = 5000 // Cap at 5 seconds
+ }
+ fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(internalTimeoutMs)*time.Millisecond)
defer cancel()
}
// Use multi-batch fetcher for better MaxBytes compliance
multiFetcher := NewMultiBatchFetcher(pr.handler)
+ startTime := time.Now()
fetchResult, err := multiFetcher.FetchMultipleBatches(
fetchCtx,
pr.topicName,
@@ -193,26 +229,54 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma
highWaterMark,
maxBytes,
)
+ fetchDuration := time.Since(startTime)
+
+ // Log slow fetches (potential hangs)
+ if fetchDuration > 2*time.Second {
+ glog.Warningf("[%s] SLOW FETCH for %s[%d]: offset=%d took %.2fs (maxWait=%dms, HWM=%d)",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration.Seconds(), maxWaitMs, highWaterMark)
+ }
if err == nil && fetchResult.TotalSize > 0 {
- glog.V(2).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d",
+ glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d (duration: %v)",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID,
- fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset)
+ fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset, fetchDuration)
return fetchResult.RecordBatches, fetchResult.NextOffset
}
- // Fallback to single batch (pass context to respect timeout)
- smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fetchCtx, pr.topicName, pr.partitionID, fromOffset, 10)
- if err == nil && len(smqRecords) > 0 {
+ // Multi-batch failed - try single batch WITHOUT the timeout constraint
+ // to ensure we get at least some data even if multi-batch timed out
+ glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err)
+
+ // Use original context for fallback, NOT the timed-out fetchCtx
+ // This ensures the fallback has a fresh chance to fetch data
+ fallbackStartTime := time.Now()
+ smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10)
+ fallbackDuration := time.Since(fallbackStartTime)
+
+ if fallbackDuration > 2*time.Second {
+ glog.Warningf("[%s] SLOW FALLBACK for %s[%d]: offset=%d took %.2fs",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fallbackDuration.Seconds())
+ }
+
+ if err != nil {
+ glog.Errorf("[%s] CRITICAL: Both multi-batch AND fallback failed for %s[%d] offset=%d: %v",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err)
+ return []byte{}, fromOffset
+ }
+
+ if len(smqRecords) > 0 {
recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords)
nextOffset := fromOffset + int64(len(smqRecords))
- glog.V(2).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d",
- pr.connCtx.ConnectionID, pr.topicName, pr.partitionID,
- len(smqRecords), len(recordBatch), fromOffset, nextOffset)
+ glog.V(3).Infof("[%s] Fallback succeeded: got %d records for %s[%d] offset %d -> %d (total: %v)",
+ pr.connCtx.ConnectionID, len(smqRecords), pr.topicName, pr.partitionID, fromOffset, nextOffset, time.Since(fetchStartTime))
return recordBatch, nextOffset
}
// No records available
+ glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d after multi-batch and fallback (total: %v)",
+ pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime))
return []byte{}, fromOffset
}