aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/fetch_partition_reader.go
blob: 0117e38095cec404f157a8a2f999ee4b84d8d123 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
package protocol

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
)

// partitionReader maintains a persistent connection to a single topic-partition
// and streams records forward, eliminating repeated offset lookups
// Pre-fetches and buffers records for instant serving
type partitionReader struct {
	topicName     string
	partitionID   int32
	currentOffset int64
	fetchChan     chan *partitionFetchRequest
	closeChan     chan struct{}

	// Pre-fetch buffer support
	recordBuffer chan *bufferedRecords // Buffered pre-fetched records
	bufferMu     sync.Mutex            // Protects offset access

	handler *Handler
	connCtx *ConnectionContext
}

// bufferedRecords represents a batch of pre-fetched records
type bufferedRecords struct {
	recordBatch   []byte
	startOffset   int64
	endOffset     int64
	highWaterMark int64
}

// partitionFetchRequest represents a request to fetch data from this partition
type partitionFetchRequest struct {
	requestedOffset int64
	maxBytes        int32
	maxWaitMs       int32 // MaxWaitTime from Kafka fetch request
	resultChan      chan *partitionFetchResult
	isSchematized   bool
	apiVersion      uint16
	correlationID   int32 // Added for correlation tracking
}

// newPartitionReader creates and starts a new partition reader with pre-fetch buffering
func newPartitionReader(ctx context.Context, handler *Handler, connCtx *ConnectionContext, topicName string, partitionID int32, startOffset int64) *partitionReader {
	pr := &partitionReader{
		topicName:     topicName,
		partitionID:   partitionID,
		currentOffset: startOffset,
		fetchChan:     make(chan *partitionFetchRequest, 200), // Buffer 200 requests to handle Schema Registry's rapid polling in slow CI environments
		closeChan:     make(chan struct{}),
		recordBuffer:  make(chan *bufferedRecords, 5), // Buffer 5 batches of records
		handler:       handler,
		connCtx:       connCtx,
	}

	// Start the pre-fetch goroutine that continuously fetches ahead
	go pr.preFetchLoop(ctx)

	// Start the request handler goroutine
	go pr.handleRequests(ctx)

	glog.V(4).Infof("[%s] Created partition reader for %s[%d] starting at offset %d (sequential with ch=200)",
		connCtx.ConnectionID, topicName, partitionID, startOffset)

	return pr
}

// preFetchLoop is disabled for SMQ backend to prevent subscriber storms
// SMQ reads from disk and creating multiple concurrent subscribers causes
// broker overload and partition shutdowns. Fetch requests are handled
// on-demand in serveFetchRequest instead.
func (pr *partitionReader) preFetchLoop(ctx context.Context) {
	defer func() {
		glog.V(4).Infof("[%s] Pre-fetch loop exiting for %s[%d]",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
		close(pr.recordBuffer)
	}()

	// Wait for shutdown - no continuous pre-fetching to avoid overwhelming the broker
	select {
	case <-ctx.Done():
		return
	case <-pr.closeChan:
		return
	}
}

// handleRequests serves fetch requests SEQUENTIALLY to prevent subscriber storm
// Sequential processing is essential for SMQ backend because:
// 1. GetStoredRecords may create a new subscriber on each call
// 2. Concurrent calls create multiple subscribers for the same partition
// 3. This overwhelms the broker and causes partition shutdowns
func (pr *partitionReader) handleRequests(ctx context.Context) {
	defer func() {
		glog.V(4).Infof("[%s] Request handler exiting for %s[%d]",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
	}()

	for {
		select {
		case <-ctx.Done():
			return
		case <-pr.closeChan:
			return
		case req := <-pr.fetchChan:
			// Process sequentially to prevent subscriber storm
			pr.serveFetchRequest(ctx, req)
		}
	}
}

// serveFetchRequest fetches data on-demand (no pre-fetching)
func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partitionFetchRequest) {
	startTime := time.Now()
	result := &partitionFetchResult{}

	// Log request START with full details
	glog.Infof("[%s] FETCH_START %s[%d]: offset=%d maxBytes=%d maxWait=%dms correlationID=%d",
		pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, req.maxBytes, req.maxWaitMs, req.correlationID)

	defer func() {
		result.fetchDuration = time.Since(startTime)

		// Log request END with results
		resultStatus := "EMPTY"
		if len(result.recordBatch) > 0 {
			resultStatus = fmt.Sprintf("DATA(%dB)", len(result.recordBatch))
		}
		glog.Infof("[%s] FETCH_END %s[%d]: offset=%d result=%s hwm=%d duration=%.2fms",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, resultStatus, result.highWaterMark, result.fetchDuration.Seconds()*1000)

		// Send result back to client
		select {
		case req.resultChan <- result:
			// Successfully sent
		case <-ctx.Done():
			glog.Warningf("[%s] Context cancelled while sending result for %s[%d]",
				pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
		case <-time.After(50 * time.Millisecond):
			glog.Warningf("[%s] Timeout sending result for %s[%d] - CLIENT MAY HAVE DISCONNECTED",
				pr.connCtx.ConnectionID, pr.topicName, pr.partitionID)
		}
	}()

	// Get high water mark
	hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID)
	if hwmErr != nil {
		glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr)
		result.recordBatch = []byte{}
		result.highWaterMark = 0
		return
	}
	result.highWaterMark = hwm

	glog.V(2).Infof("[%s] HWM for %s[%d]: %d (requested: %d)",
		pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwm, req.requestedOffset)

	// If requested offset >= HWM, return immediately with empty result
	// This prevents overwhelming the broker with futile read attempts when no data is available
	if req.requestedOffset >= hwm {
		result.recordBatch = []byte{}
		glog.V(3).Infof("[%s] Requested offset %d >= HWM %d, returning empty",
			pr.connCtx.ConnectionID, req.requestedOffset, hwm)
		return
	}

	// Update tracking offset to match requested offset
	pr.bufferMu.Lock()
	if req.requestedOffset != pr.currentOffset {
		glog.V(3).Infof("[%s] Updating currentOffset for %s[%d]: %d -> %d",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, pr.currentOffset, req.requestedOffset)
		pr.currentOffset = req.requestedOffset
	}
	pr.bufferMu.Unlock()

	// Fetch on-demand - no pre-fetching to avoid overwhelming the broker
	recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm)

	// Log what we got back - DETAILED for diagnostics
	if len(recordBatch) == 0 {
		glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm)
		result.recordBatch = []byte{}
	} else {
		// Log successful fetch with details
		glog.Infof("[%s] FETCH SUCCESS %s[%d]: offset %d->%d (hwm=%d, bytes=%d)",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, newOffset, hwm, len(recordBatch))
		result.recordBatch = recordBatch
		pr.bufferMu.Lock()
		pr.currentOffset = newOffset
		pr.bufferMu.Unlock()
	}
}

// readRecords reads records forward using the multi-batch fetcher
func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, maxBytes int32, maxWaitMs int32, highWaterMark int64) ([]byte, int64) {
	fetchStartTime := time.Now()

	// Create context with timeout based on Kafka fetch request's MaxWaitTime
	// This ensures we wait exactly as long as the client requested
	fetchCtx := ctx
	if maxWaitMs > 0 {
		var cancel context.CancelFunc
		// Use 1.5x the client timeout to account for internal processing overhead
		// This prevents legitimate slow reads from being killed by client timeout
		internalTimeoutMs := int32(float64(maxWaitMs) * 1.5)
		if internalTimeoutMs > 5000 {
			internalTimeoutMs = 5000 // Cap at 5 seconds
		}
		fetchCtx, cancel = context.WithTimeout(ctx, time.Duration(internalTimeoutMs)*time.Millisecond)
		defer cancel()
	}

	// Use multi-batch fetcher for better MaxBytes compliance
	multiFetcher := NewMultiBatchFetcher(pr.handler)
	startTime := time.Now()
	fetchResult, err := multiFetcher.FetchMultipleBatches(
		fetchCtx,
		pr.topicName,
		pr.partitionID,
		fromOffset,
		highWaterMark,
		maxBytes,
	)
	fetchDuration := time.Since(startTime)

	// Log slow fetches (potential hangs)
	if fetchDuration > 2*time.Second {
		glog.Warningf("[%s] SLOW FETCH for %s[%d]: offset=%d took %.2fs (maxWait=%dms, HWM=%d)",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration.Seconds(), maxWaitMs, highWaterMark)
	}

	if err == nil && fetchResult.TotalSize > 0 {
		glog.V(4).Infof("[%s] Multi-batch fetch for %s[%d]: %d batches, %d bytes, offset %d -> %d (duration: %v)",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID,
			fetchResult.BatchCount, fetchResult.TotalSize, fromOffset, fetchResult.NextOffset, fetchDuration)
		return fetchResult.RecordBatches, fetchResult.NextOffset
	}

	// Multi-batch failed - try single batch WITHOUT the timeout constraint
	// to ensure we get at least some data even if multi-batch timed out
	glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)",
		pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err)

	// Use original context for fallback, NOT the timed-out fetchCtx
	// This ensures the fallback has a fresh chance to fetch data
	fallbackStartTime := time.Now()
	smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10)
	fallbackDuration := time.Since(fallbackStartTime)

	if fallbackDuration > 2*time.Second {
		glog.Warningf("[%s] SLOW FALLBACK for %s[%d]: offset=%d took %.2fs",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fallbackDuration.Seconds())
	}

	if err != nil {
		glog.Errorf("[%s] CRITICAL: Both multi-batch AND fallback failed for %s[%d] offset=%d: %v",
			pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, err)
		return []byte{}, fromOffset
	}

	if len(smqRecords) > 0 {
		recordBatch := pr.handler.constructRecordBatchFromSMQ(pr.topicName, fromOffset, smqRecords)
		nextOffset := fromOffset + int64(len(smqRecords))
		glog.V(3).Infof("[%s] Fallback succeeded: got %d records for %s[%d] offset %d -> %d (total: %v)",
			pr.connCtx.ConnectionID, len(smqRecords), pr.topicName, pr.partitionID, fromOffset, nextOffset, time.Since(fetchStartTime))
		return recordBatch, nextOffset
	}

	// No records available
	glog.V(3).Infof("[%s] No records available for %s[%d] offset=%d after multi-batch and fallback (total: %v)",
		pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, time.Since(fetchStartTime))
	return []byte{}, fromOffset
}

// close signals the reader to shut down
func (pr *partitionReader) close() {
	close(pr.closeChan)
}