diff options
Diffstat (limited to 'weed/mq/kafka/protocol/fetch_partition_reader.go')
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_partition_reader.go | 118 |
1 files changed, 91 insertions, 27 deletions
diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 520b524cb..0117e3809 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -2,6 +2,7 @@ package protocol import ( "context" + "fmt" "sync" "time" @@ -42,6 +43,7 @@ type partitionFetchRequest struct { resultChan chan *partitionFetchResult isSchematized bool apiVersion uint16 + correlationID int32 // Added for correlation tracking } // newPartitionReader creates and starts a new partition reader with pre-fetch buffering @@ -63,7 +65,7 @@ func newPartitionReader(ctx context.Context, handler *Handler, connCtx *Connecti // Start the request handler goroutine go pr.handleRequests(ctx) - glog.V(2).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)", + glog.V(4).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)", connCtx.ConnectionID, topicName, partitionID, startOffset) return pr @@ -75,7 +77,7 @@ func newPartitionReader(ctx context.Context, handler *Handler, connCtx *Connecti // on-demand in serveFetchRequest instead. func (pr *partitionReader) preFetchLoop(ctx context.Context) { defer func() { - glog.V(2).Infof("[%s] Pre-fetch loop exiting for %s[%d]", + glog.V(4).Infof("[%s] Pre-fetch loop exiting for %s[%d]", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) close(pr.recordBuffer) }() @@ -90,13 +92,13 @@ func (pr *partitionReader) preFetchLoop(ctx context.Context) { } // handleRequests serves fetch requests SEQUENTIALLY to prevent subscriber storm -// CRITICAL: Sequential processing is essential for SMQ backend because: +// Sequential processing is essential for SMQ backend because: // 1. GetStoredRecords may create a new subscriber on each call // 2. Concurrent calls create multiple subscribers for the same partition // 3. This overwhelms the broker and causes partition shutdowns func (pr *partitionReader) handleRequests(ctx context.Context) { defer func() { - glog.V(2).Infof("[%s] Request handler exiting for %s[%d]", + glog.V(4).Infof("[%s] Request handler exiting for %s[%d]", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) }() @@ -117,13 +119,31 @@ func (pr *partitionReader) handleRequests(ctx context.Context) { func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) { startTime := time.Now() result := &partitionFetchResult{} + + // Log request START with full details + glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID) + defer func() { result.fetchDuration = time.Since(startTime) + + // Log request END with results + resultStatus := "EMPTY" + if len(result.recordBatch) > 0 { + resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch)) + } + glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000) + + // Send result back to client select { case req.resultChan <- result: + // Successfully sent case <-ctx.Done(): + glog.Warningf("[%s] Context cancelled while sending result for %s[%d]", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) case <-time.After(50 * time.Millisecond): - glog.Warningf("[%s] Timeout sending result for %s[%d]", + glog.Warningf("[%s] Timeout sending result for %s[%d] - CLIENT MAY HAVE DISCONNECTED", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) } }() @@ -131,60 +151,76 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // Get high water mark hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) if hwmErr != nil { - glog.Warningf("[%s] Failed to get high water mark for %s[%d]: %v", + glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) result.recordBatch = []byte{} + result.highWaterMark = 0 return } result.highWaterMark = hwm - // CRITICAL: If requested offset >= HWM, return immediately with empty result + glog.V(2).Infof("[%s] HWM for %s[%d]: %d (requested: %d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwm, req.requestedOffset) + + // If requested offset >= HWM, return immediately with empty result // This prevents overwhelming the broker with futile read attempts when no data is available if req.requestedOffset >= hwm { result.recordBatch = []byte{} - glog.V(3).Infof("[%s] No data available for %s[%d]: offset=%d >= hwm=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + glog.V(3).Infof("[%s] Requested offset %d >= HWM %d, returning empty", + pr.connCtx.ConnectionID, req.requestedOffset, hwm) return } // Update tracking offset to match requested offset pr.bufferMu.Lock() if req.requestedOffset != pr.currentOffset { - glog.V(2).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, pr.currentOffset) + glog.V(3).Infof("[%s] Updating currentOffset for %s[%d]: %d -> %d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, pr.currentOffset, req.requestedOffset) pr.currentOffset = req.requestedOffset } pr.bufferMu.Unlock() // Fetch on-demand - no pre-fetching to avoid overwhelming the broker - // Pass the requested offset and maxWaitMs directly to avoid race conditions recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) - if len(recordBatch) > 0 && newOffset > pr.currentOffset { + + // Log what we got back - DETAILED for diagnostics + if len(recordBatch) == 0 { + glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + result.recordBatch = []byte{} + } else { + // Log successful fetch with details + glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch)) result.recordBatch = recordBatch pr.bufferMu.Lock() pr.currentOffset = newOffset pr.bufferMu.Unlock() - glog.V(2).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - req.requestedOffset, newOffset, len(recordBatch)) - } else { - result.recordBatch = []byte{} } } // readRecords reads records forward using the multi-batch fetcher func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, maxBytes int32, maxWaitMs int32, highWaterMark int64) ([]byte, int64) { + fetchStartTime := time.Now() + // Create context with timeout based on Kafka fetch request's MaxWaitTime // This ensures we wait exactly as long as the client requested fetchCtx := ctx if maxWaitMs > 0 { var cancel context.CancelFunc - fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(maxWaitMs)*time.Millisecond) + // Use 1.5x the client timeout to account for internal processing overhead + // This prevents legitimate slow reads from being killed by client timeout + internalTimeoutMs := int32(float64(maxWaitMs) * 1.5) + if internalTimeoutMs > 5000 { + internalTimeoutMs = 5000 // Cap at 5 seconds + } + fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(internalTimeoutMs)*time.Millisecond) defer cancel() } // Use multi-batch fetcher for better MaxBytes compliance multiFetcher := NewMultiBatchFetcher(pr.handler) + startTime := time.Now() fetchResult, err := multiFetcher.FetchMultipleBatches( fetchCtx, pr.topicName, @@ -193,26 +229,54 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma highWaterMark, maxBytes, ) + fetchDuration := time.Since(startTime) + + // Log slow fetches (potential hangs) + if fetchDuration > 2*time.Second { + glog.Warningf("[%s] SLOW FETCH for %s[%d]: offset=%d took %.2fs (maxWait=%dms, HWM=%d)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration.Seconds(), maxWaitMs, highWaterMark) + } if err == nil && fetchResult.TotalSize > 0 { - glog.V(2).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d", + glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d (duration: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset) + fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset, fetchDuration) return fetchResult.RecordBatches, fetchResult.NextOffset } - // Fallback to single batch (pass context to respect timeout) - smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fetchCtx, pr.topicName, pr.partitionID, fromOffset, 10) - if err == nil && len(smqRecords) > 0 { + // Multi-batch failed - try single batch WITHOUT the timeout constraint + // to ensure we get at least some data even if multi-batch timed out + glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err) + + // Use original context for fallback, NOT the timed-out fetchCtx + // This ensures the fallback has a fresh chance to fetch data + fallbackStartTime := time.Now() + smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10) + fallbackDuration := time.Since(fallbackStartTime) + + if fallbackDuration > 2*time.Second { + glog.Warningf("[%s] SLOW FALLBACK for %s[%d]: offset=%d took %.2fs", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fallbackDuration.Seconds()) + } + + if err != nil { + glog.Errorf("[%s] CRITICAL: Both multi-batch AND fallback failed for %s[%d] offset=%d: %v", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err) + return []byte{}, fromOffset + } + + if len(smqRecords) > 0 { recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) nextOffset := fromOffset + int64(len(smqRecords)) - glog.V(2).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, - len(smqRecords), len(recordBatch), fromOffset, nextOffset) + glog.V(3).Infof("[%s] Fallback succeeded: got %d records for %s[%d] offset %d -> %d (total: %v)", + pr.connCtx.ConnectionID, len(smqRecords), pr.topicName, pr.partitionID, fromOffset, nextOffset, time.Since(fetchStartTime)) return recordBatch, nextOffset } // No records available + glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d after multi-batch and fallback (total: %v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime)) return []byte{}, fromOffset } |
