diff options
Diffstat (limited to 'weed/mq/kafka/protocol/fetch_partition_reader.go')
| -rw-r--r-- | weed/mq/kafka/protocol/fetch_partition_reader.go | 222 |
1 files changed, 222 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go new file mode 100644 index 000000000..520b524cb --- /dev/null +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -0,0 +1,222 @@ +package protocol + +import ( + "context" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// partitionReader maintains a persistent connection to a single topic-partition +// and streams records forward, eliminating repeated offset lookups +// Pre-fetches and buffers records for instant serving +type partitionReader struct { + topicName string + partitionID int32 + currentOffset int64 + fetchChan chan *partitionFetchRequest + closeChan chan struct{} + + // Pre-fetch buffer support + recordBuffer chan *bufferedRecords // Buffered pre-fetched records + bufferMu sync.Mutex // Protects offset access + + handler *Handler + connCtx *ConnectionContext +} + +// bufferedRecords represents a batch of pre-fetched records +type bufferedRecords struct { + recordBatch []byte + startOffset int64 + endOffset int64 + highWaterMark int64 +} + +// partitionFetchRequest represents a request to fetch data from this partition +type partitionFetchRequest struct { + requestedOffset int64 + maxBytes int32 + maxWaitMs int32 // MaxWaitTime from Kafka fetch request + resultChan chan *partitionFetchResult + isSchematized bool + apiVersion uint16 +} + +// newPartitionReader creates and starts a new partition reader with pre-fetch buffering +func newPartitionReader(ctx context.Context, handler *Handler, connCtx *ConnectionContext, topicName string, partitionID int32, startOffset int64) *partitionReader { + pr := &partitionReader{ + topicName: topicName, + partitionID: partitionID, + currentOffset: startOffset, + fetchChan: make(chan *partitionFetchRequest, 200), // Buffer 200 requests to handle Schema Registry's rapid polling in slow CI environments + closeChan: make(chan struct{}), + recordBuffer: make(chan *bufferedRecords, 5), // Buffer 5 batches of records + handler: handler, + connCtx: connCtx, + } + + // Start the pre-fetch goroutine that continuously fetches ahead + go pr.preFetchLoop(ctx) + + // Start the request handler goroutine + go pr.handleRequests(ctx) + + glog.V(2).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)", + connCtx.ConnectionID, topicName, partitionID, startOffset) + + return pr +} + +// preFetchLoop is disabled for SMQ backend to prevent subscriber storms +// SMQ reads from disk and creating multiple concurrent subscribers causes +// broker overload and partition shutdowns. Fetch requests are handled +// on-demand in serveFetchRequest instead. +func (pr *partitionReader) preFetchLoop(ctx context.Context) { + defer func() { + glog.V(2).Infof("[%s] Pre-fetch loop exiting for %s[%d]", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) + close(pr.recordBuffer) + }() + + // Wait for shutdown - no continuous pre-fetching to avoid overwhelming the broker + select { + case <-ctx.Done(): + return + case <-pr.closeChan: + return + } +} + +// handleRequests serves fetch requests SEQUENTIALLY to prevent subscriber storm +// CRITICAL: Sequential processing is essential for SMQ backend because: +// 1. GetStoredRecords may create a new subscriber on each call +// 2. Concurrent calls create multiple subscribers for the same partition +// 3. This overwhelms the broker and causes partition shutdowns +func (pr *partitionReader) handleRequests(ctx context.Context) { + defer func() { + glog.V(2).Infof("[%s] Request handler exiting for %s[%d]", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) + }() + + for { + select { + case <-ctx.Done(): + return + case <-pr.closeChan: + return + case req := <-pr.fetchChan: + // Process sequentially to prevent subscriber storm + pr.serveFetchRequest(ctx, req) + } + } +} + +// serveFetchRequest fetches data on-demand (no pre-fetching) +func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) { + startTime := time.Now() + result := &partitionFetchResult{} + defer func() { + result.fetchDuration = time.Since(startTime) + select { + case req.resultChan <- result: + case <-ctx.Done(): + case <-time.After(50 * time.Millisecond): + glog.Warningf("[%s] Timeout sending result for %s[%d]", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID) + } + }() + + // Get high water mark + hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) + if hwmErr != nil { + glog.Warningf("[%s] Failed to get high water mark for %s[%d]: %v", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) + result.recordBatch = []byte{} + return + } + result.highWaterMark = hwm + + // CRITICAL: If requested offset >= HWM, return immediately with empty result + // This prevents overwhelming the broker with futile read attempts when no data is available + if req.requestedOffset >= hwm { + result.recordBatch = []byte{} + glog.V(3).Infof("[%s] No data available for %s[%d]: offset=%d >= hwm=%d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + return + } + + // Update tracking offset to match requested offset + pr.bufferMu.Lock() + if req.requestedOffset != pr.currentOffset { + glog.V(2).Infof("[%s] Offset seek for %s[%d]: requested=%d current=%d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, pr.currentOffset) + pr.currentOffset = req.requestedOffset + } + pr.bufferMu.Unlock() + + // Fetch on-demand - no pre-fetching to avoid overwhelming the broker + // Pass the requested offset and maxWaitMs directly to avoid race conditions + recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) + if len(recordBatch) > 0 && newOffset > pr.currentOffset { + result.recordBatch = recordBatch + pr.bufferMu.Lock() + pr.currentOffset = newOffset + pr.bufferMu.Unlock() + glog.V(2).Infof("[%s] On-demand fetch for %s[%d]: offset %d->%d, %d bytes", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, + req.requestedOffset, newOffset, len(recordBatch)) + } else { + result.recordBatch = []byte{} + } +} + +// readRecords reads records forward using the multi-batch fetcher +func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, maxBytes int32, maxWaitMs int32, highWaterMark int64) ([]byte, int64) { + // Create context with timeout based on Kafka fetch request's MaxWaitTime + // This ensures we wait exactly as long as the client requested + fetchCtx := ctx + if maxWaitMs > 0 { + var cancel context.CancelFunc + fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(maxWaitMs)*time.Millisecond) + defer cancel() + } + + // Use multi-batch fetcher for better MaxBytes compliance + multiFetcher := NewMultiBatchFetcher(pr.handler) + fetchResult, err := multiFetcher.FetchMultipleBatches( + fetchCtx, + pr.topicName, + pr.partitionID, + fromOffset, + highWaterMark, + maxBytes, + ) + + if err == nil && fetchResult.TotalSize > 0 { + glog.V(2).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, + fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset) + return fetchResult.RecordBatches, fetchResult.NextOffset + } + + // Fallback to single batch (pass context to respect timeout) + smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fetchCtx, pr.topicName, pr.partitionID, fromOffset, 10) + if err == nil && len(smqRecords) > 0 { + recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords) + nextOffset := fromOffset + int64(len(smqRecords)) + glog.V(2).Infof("[%s] Single-batch fetch for %s[%d]: %d records, %d bytes, offset %d -> %d", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, + len(smqRecords), len(recordBatch), fromOffset, nextOffset) + return recordBatch, nextOffset + } + + // No records available + return []byte{}, fromOffset +} + +// close signals the reader to shut down +func (pr *partitionReader) close() { + close(pr.closeChan) +} |
