aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/broker_client_subscribe.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/broker_client_subscribe.go')
-rw-r--r--weed/mq/kafka/integration/broker_client_subscribe.go703
1 files changed, 703 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go
new file mode 100644
index 000000000..a0b8504bf
--- /dev/null
+++ b/weed/mq/kafka/integration/broker_client_subscribe.go
@@ -0,0 +1,703 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// CreateFreshSubscriber creates a new subscriber session without caching
+// This ensures each fetch gets fresh data from the requested offset
+// consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ
+func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) {
+ // Create a dedicated context for this subscriber
+ subscriberCtx := context.Background()
+
+ stream, err := bc.client.SubscribeMessage(subscriberCtx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
+ }
+
+ // Get the actual partition assignment from the broker
+ actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
+ }
+
+ // Convert Kafka offset to SeaweedMQ OffsetType
+ var offsetType schema_pb.OffsetType
+ var startTimestamp int64
+ var startOffsetValue int64
+
+ // Use EXACT_OFFSET to read from the specific offset
+ offsetType = schema_pb.OffsetType_EXACT_OFFSET
+ startTimestamp = 0
+ startOffsetValue = startOffset
+
+ // Send init message to start subscription with Kafka client's consumer group and ID
+ initReq := &mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Init{
+ Init: &mq_pb.SubscribeMessageRequest_InitMessage{
+ ConsumerGroup: consumerGroup,
+ ConsumerId: consumerID,
+ ClientId: "kafka-gateway",
+ Topic: &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: topic,
+ },
+ PartitionOffset: &schema_pb.PartitionOffset{
+ Partition: actualPartition,
+ StartTsNs: startTimestamp,
+ StartOffset: startOffsetValue,
+ },
+ OffsetType: offsetType,
+ SlidingWindowSize: 10,
+ },
+ },
+ }
+
+ if err := stream.Send(initReq); err != nil {
+ return nil, fmt.Errorf("failed to send subscribe init: %v", err)
+ }
+
+ // IMPORTANT: Don't wait for init response here!
+ // The broker may send the first data record as the "init response"
+ // If we call Recv() here, we'll consume that first record and ReadRecords will block
+ // waiting for the second record, causing a 30-second timeout.
+ // Instead, let ReadRecords handle all Recv() calls.
+
+ session := &BrokerSubscriberSession{
+ Stream: stream,
+ Topic: topic,
+ Partition: partition,
+ StartOffset: startOffset,
+ ConsumerGroup: consumerGroup,
+ ConsumerID: consumerID,
+ }
+
+ return session, nil
+}
+
+// GetOrCreateSubscriber gets or creates a subscriber for offset tracking
+func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) {
+ // Create a temporary session to generate the key
+ tempSession := &BrokerSubscriberSession{
+ Topic: topic,
+ Partition: partition,
+ ConsumerGroup: consumerGroup,
+ ConsumerID: consumerID,
+ }
+ key := tempSession.Key()
+
+ bc.subscribersLock.RLock()
+ if session, exists := bc.subscribers[key]; exists {
+ // Check if we need to recreate the session
+ if session.StartOffset != startOffset {
+ // CRITICAL FIX: Check cache first before recreating
+ // If the requested offset is in cache, we can reuse the session
+ session.mu.Lock()
+ canUseCache := false
+
+ if len(session.consumedRecords) > 0 {
+ cacheStartOffset := session.consumedRecords[0].Offset
+ cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
+ if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset {
+ canUseCache = true
+ glog.V(2).Infof("[FETCH] Session offset mismatch for %s (session=%d, requested=%d), but offset is in cache [%d-%d]",
+ key, session.StartOffset, startOffset, cacheStartOffset, cacheEndOffset)
+ }
+ }
+
+ session.mu.Unlock()
+
+ if canUseCache {
+ // Offset is in cache, reuse session
+ bc.subscribersLock.RUnlock()
+ return session, nil
+ }
+
+ // Not in cache - need to recreate session at the requested offset
+ glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (not in cache)",
+ key, session.StartOffset, startOffset)
+ bc.subscribersLock.RUnlock()
+
+ // Close and delete the old session
+ bc.subscribersLock.Lock()
+ // CRITICAL: Double-check if another thread already recreated the session at the desired offset
+ // This prevents multiple concurrent threads from all trying to recreate the same session
+ if existingSession, exists := bc.subscribers[key]; exists {
+ existingSession.mu.Lock()
+ existingOffset := existingSession.StartOffset
+ existingSession.mu.Unlock()
+
+ // Check if the session was already recreated at (or before) the requested offset
+ if existingOffset <= startOffset {
+ bc.subscribersLock.Unlock()
+ glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset)
+ // Re-acquire the existing session and continue
+ return existingSession, nil
+ }
+
+ // Session still needs recreation - close it
+ if existingSession.Stream != nil {
+ _ = existingSession.Stream.CloseSend()
+ }
+ if existingSession.Cancel != nil {
+ existingSession.Cancel()
+ }
+ delete(bc.subscribers, key)
+ }
+ bc.subscribersLock.Unlock()
+ } else {
+ // Exact match - reuse
+ bc.subscribersLock.RUnlock()
+ return session, nil
+ }
+ } else {
+ bc.subscribersLock.RUnlock()
+ }
+
+ // Create new subscriber stream
+ bc.subscribersLock.Lock()
+ defer bc.subscribersLock.Unlock()
+
+ if session, exists := bc.subscribers[key]; exists {
+ return session, nil
+ }
+
+ // CRITICAL FIX: Use background context for subscriber to prevent premature cancellation
+ // Subscribers need to continue reading data even when the connection is closing,
+ // otherwise Schema Registry and other clients can't read existing data.
+ // The subscriber will be cleaned up when the stream is explicitly closed.
+ subscriberCtx := context.Background()
+ subscriberCancel := func() {} // No-op cancel
+
+ stream, err := bc.client.SubscribeMessage(subscriberCtx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create subscribe stream: %v", err)
+ }
+
+ // Get the actual partition assignment from the broker instead of using Kafka partition mapping
+ actualPartition, err := bc.getActualPartitionAssignment(topic, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err)
+ }
+
+ // Convert Kafka offset to appropriate SeaweedMQ OffsetType and parameters
+ var offsetType schema_pb.OffsetType
+ var startTimestamp int64
+ var startOffsetValue int64
+
+ if startOffset == -1 {
+ // Kafka offset -1 typically means "latest"
+ offsetType = schema_pb.OffsetType_RESET_TO_LATEST
+ startTimestamp = 0 // Not used with RESET_TO_LATEST
+ startOffsetValue = 0 // Not used with RESET_TO_LATEST
+ glog.V(1).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)")
+ } else {
+ // CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset
+ // This allows the subscriber to read from both buffer and disk at the correct position
+ offsetType = schema_pb.OffsetType_EXACT_OFFSET
+ startTimestamp = 0 // Not used with EXACT_OFFSET
+ startOffsetValue = startOffset // Use the exact Kafka offset
+ glog.V(1).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset)
+ }
+
+ glog.V(1).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)",
+ topic, partition, startOffset, offsetType, startTimestamp)
+
+ // Send init message using the actual partition structure that the broker allocated
+ if err := stream.Send(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Init{
+ Init: &mq_pb.SubscribeMessageRequest_InitMessage{
+ ConsumerGroup: consumerGroup,
+ ConsumerId: consumerID,
+ ClientId: "kafka-gateway",
+ Topic: &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: topic,
+ },
+ PartitionOffset: &schema_pb.PartitionOffset{
+ Partition: actualPartition,
+ StartTsNs: startTimestamp,
+ StartOffset: startOffsetValue,
+ },
+ OffsetType: offsetType, // Use the correct offset type
+ SlidingWindowSize: 10,
+ },
+ },
+ }); err != nil {
+ return nil, fmt.Errorf("failed to send subscribe init: %v", err)
+ }
+
+ session := &BrokerSubscriberSession{
+ Topic: topic,
+ Partition: partition,
+ Stream: stream,
+ StartOffset: startOffset,
+ ConsumerGroup: consumerGroup,
+ ConsumerID: consumerID,
+ Ctx: subscriberCtx,
+ Cancel: subscriberCancel,
+ }
+
+ bc.subscribers[key] = session
+ glog.V(2).Infof("Created subscriber session for %s with context cancellation support", key)
+ return session, nil
+}
+
+// ReadRecordsFromOffset reads records starting from a specific offset
+// If the offset is in cache, returns cached records; otherwise delegates to ReadRecords
+// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime)
+func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *BrokerSubscriberSession, requestedOffset int64, maxRecords int) ([]*SeaweedRecord, error) {
+ if session == nil {
+ return nil, fmt.Errorf("subscriber session cannot be nil")
+ }
+
+ session.mu.Lock()
+
+ glog.V(2).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d",
+ session.Topic, session.Partition, requestedOffset, session.StartOffset, maxRecords)
+
+ // Check cache first
+ if len(session.consumedRecords) > 0 {
+ cacheStartOffset := session.consumedRecords[0].Offset
+ cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
+
+ if requestedOffset >= cacheStartOffset && requestedOffset <= cacheEndOffset {
+ // Found in cache
+ startIdx := int(requestedOffset - cacheStartOffset)
+ endIdx := startIdx + maxRecords
+ if endIdx > len(session.consumedRecords) {
+ endIdx = len(session.consumedRecords)
+ }
+ glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset)
+ session.mu.Unlock()
+ return session.consumedRecords[startIdx:endIdx], nil
+ }
+ }
+
+ // CRITICAL FIX for Schema Registry: Keep subscriber alive across multiple fetch requests
+ // Schema Registry expects to make multiple poll() calls on the same consumer connection
+ //
+ // Three scenarios:
+ // 1. requestedOffset < session.StartOffset: Need to seek backward (recreate)
+ // 2. requestedOffset == session.StartOffset: Continue reading (use existing)
+ // 3. requestedOffset > session.StartOffset: Continue reading forward (use existing)
+ //
+ // The session will naturally advance as records are consumed, so we should NOT
+ // recreate it just because requestedOffset != session.StartOffset
+
+ if requestedOffset < session.StartOffset {
+ // Need to seek backward - close old session and create a fresh subscriber
+ // Restarting an existing stream doesn't work reliably because the broker may still
+ // have old data buffered in the stream pipeline
+ glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber",
+ requestedOffset, session.StartOffset)
+
+ // Extract session details before unlocking
+ topic := session.Topic
+ partition := session.Partition
+ consumerGroup := session.ConsumerGroup
+ consumerID := session.ConsumerID
+ key := session.Key()
+ session.mu.Unlock()
+
+ // Close the old session completely
+ bc.subscribersLock.Lock()
+ // CRITICAL: Double-check if another thread already recreated the session at the desired offset
+ // This prevents multiple concurrent threads from all trying to recreate the same session
+ if existingSession, exists := bc.subscribers[key]; exists {
+ existingSession.mu.Lock()
+ existingOffset := existingSession.StartOffset
+ existingSession.mu.Unlock()
+
+ // Check if the session was already recreated at (or before) the requested offset
+ if existingOffset <= requestedOffset {
+ bc.subscribersLock.Unlock()
+ glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, requestedOffset)
+ // Re-acquire the existing session and continue
+ return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords)
+ }
+
+ // Session still needs recreation - close it
+ if existingSession.Stream != nil {
+ _ = existingSession.Stream.CloseSend()
+ }
+ if existingSession.Cancel != nil {
+ existingSession.Cancel()
+ }
+ delete(bc.subscribers, key)
+ glog.V(1).Infof("[FETCH] Closed old subscriber session for backward seek: %s", key)
+ }
+ bc.subscribersLock.Unlock()
+
+ // Create a completely fresh subscriber at the requested offset
+ newSession, err := bc.GetOrCreateSubscriber(topic, partition, requestedOffset, consumerGroup, consumerID)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create fresh subscriber at offset %d: %w", requestedOffset, err)
+ }
+
+ // Read from fresh subscriber
+ return bc.ReadRecords(ctx, newSession, maxRecords)
+ }
+
+ // requestedOffset >= session.StartOffset: Keep reading forward from existing session
+ // This handles:
+ // - Exact match (requestedOffset == session.StartOffset)
+ // - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache)
+ glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)",
+ requestedOffset, session.StartOffset)
+ session.mu.Unlock()
+ return bc.ReadRecords(ctx, session, maxRecords)
+}
+
+// ReadRecords reads available records from the subscriber stream
+// Uses a timeout-based approach to read multiple records without blocking indefinitely
+// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime)
+func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscriberSession, maxRecords int) ([]*SeaweedRecord, error) {
+ if session == nil {
+ return nil, fmt.Errorf("subscriber session cannot be nil")
+ }
+
+ if session.Stream == nil {
+ return nil, fmt.Errorf("subscriber session stream cannot be nil")
+ }
+
+ // CRITICAL: Lock to prevent concurrent reads from the same stream
+ // Multiple Fetch requests may try to read from the same subscriber concurrently,
+ // causing the broker to return the same offset repeatedly
+ session.mu.Lock()
+ defer session.mu.Unlock()
+
+ glog.V(2).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d",
+ session.Topic, session.Partition, session.StartOffset, maxRecords)
+
+ var records []*SeaweedRecord
+ currentOffset := session.StartOffset
+
+ // CRITICAL FIX: Return immediately if maxRecords is 0 or negative
+ if maxRecords <= 0 {
+ return records, nil
+ }
+
+ // CRITICAL FIX: Use cached records if available to avoid broker tight loop
+ // If we've already consumed these records, return them from cache
+ if len(session.consumedRecords) > 0 {
+ cacheStartOffset := session.consumedRecords[0].Offset
+ cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset
+
+ if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset {
+ // Records are in cache
+ glog.V(2).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]",
+ currentOffset, cacheStartOffset, cacheEndOffset)
+
+ // Find starting index in cache
+ startIdx := int(currentOffset - cacheStartOffset)
+ if startIdx < 0 || startIdx >= len(session.consumedRecords) {
+ glog.Errorf("[FETCH] Cache index out of bounds: startIdx=%d, cache size=%d", startIdx, len(session.consumedRecords))
+ return records, nil
+ }
+
+ // Return up to maxRecords from cache
+ endIdx := startIdx + maxRecords
+ if endIdx > len(session.consumedRecords) {
+ endIdx = len(session.consumedRecords)
+ }
+
+ glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1)
+ return session.consumedRecords[startIdx:endIdx], nil
+ }
+ }
+
+ // Read first record with timeout (important for empty topics)
+ // CRITICAL: For SMQ backend with consumer groups, we need adequate timeout for disk reads
+ // When a consumer group resumes from a committed offset, the subscriber may need to:
+ // 1. Connect to the broker (network latency)
+ // 2. Seek to the correct offset in the log file (disk I/O)
+ // 3. Read and deserialize the record (disk I/O)
+ // Total latency can be 100-500ms for cold reads from disk
+ //
+ // CRITICAL: Use the context from the Kafka fetch request
+ // The context timeout is set by the caller based on the Kafka fetch request's MaxWaitTime
+ // This ensures we wait exactly as long as the client requested, not more or less
+ // For in-memory reads (hot path), records arrive in <10ms
+ // For low-volume topics (like _schemas), the caller sets longer timeout to keep subscriber alive
+ // If no context provided, use a reasonable default timeout
+ if ctx == nil {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ }
+
+ type recvResult struct {
+ resp *mq_pb.SubscribeMessageResponse
+ err error
+ }
+ recvChan := make(chan recvResult, 1)
+
+ // Try to receive first record
+ go func() {
+ resp, err := session.Stream.Recv()
+ select {
+ case recvChan <- recvResult{resp: resp, err: err}:
+ case <-ctx.Done():
+ // Context cancelled, don't send (avoid blocking)
+ }
+ }()
+
+ select {
+ case result := <-recvChan:
+ if result.err != nil {
+ glog.V(2).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err)
+ return records, nil // Return empty - no error for empty topic
+ }
+
+ if dataMsg := result.resp.GetData(); dataMsg != nil {
+ record := &SeaweedRecord{
+ Key: dataMsg.Key,
+ Value: dataMsg.Value,
+ Timestamp: dataMsg.TsNs,
+ Offset: currentOffset,
+ }
+ records = append(records, record)
+ currentOffset++
+ glog.V(4).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d",
+ record.Offset, len(record.Key), len(record.Value))
+ }
+
+ case <-ctx.Done():
+ // Timeout on first record - topic is empty or no data available
+ glog.V(4).Infof("[FETCH] No data available (timeout on first record)")
+ return records, nil
+ }
+
+ // If we got the first record, try to get more with adaptive timeout
+ // CRITICAL: Schema Registry catch-up scenario - give generous timeout for the first batch
+ // Schema Registry needs to read multiple records quickly when catching up (e.g., offsets 3-6)
+ // The broker may be reading from disk, which introduces 10-20ms delay between records
+ //
+ // Strategy: Start with generous timeout (1 second) for first 5 records to allow broker
+ // to read from disk, then switch to fast mode (100ms) for streaming in-memory data
+ consecutiveReads := 0
+
+ for len(records) < maxRecords {
+ // Adaptive timeout based on how many records we've already read
+ var currentTimeout time.Duration
+ if consecutiveReads < 5 {
+ // First 5 records: generous timeout for disk reads + network delays
+ currentTimeout = 1 * time.Second
+ } else {
+ // After 5 records: assume we're streaming from memory, use faster timeout
+ currentTimeout = 100 * time.Millisecond
+ }
+
+ readStart := time.Now()
+ ctx2, cancel2 := context.WithTimeout(context.Background(), currentTimeout)
+ recvChan2 := make(chan recvResult, 1)
+
+ go func() {
+ resp, err := session.Stream.Recv()
+ select {
+ case recvChan2 <- recvResult{resp: resp, err: err}:
+ case <-ctx2.Done():
+ // Context cancelled
+ }
+ }()
+
+ select {
+ case result := <-recvChan2:
+ cancel2()
+ readDuration := time.Since(readStart)
+
+ if result.err != nil {
+ glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err)
+ // Update session offset before returning
+ session.StartOffset = currentOffset
+ return records, nil
+ }
+
+ if dataMsg := result.resp.GetData(); dataMsg != nil {
+ record := &SeaweedRecord{
+ Key: dataMsg.Key,
+ Value: dataMsg.Value,
+ Timestamp: dataMsg.TsNs,
+ Offset: currentOffset,
+ }
+ records = append(records, record)
+ currentOffset++
+ consecutiveReads++ // Track number of successful reads for adaptive timeout
+
+ glog.V(4).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v",
+ len(records), record.Offset, len(record.Key), len(record.Value), readDuration)
+ }
+
+ case <-ctx2.Done():
+ cancel2()
+ // Timeout - return what we have
+ glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart))
+ // CRITICAL: Update session offset so next fetch knows where we left off
+ session.StartOffset = currentOffset
+ return records, nil
+ }
+ }
+
+ glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records))
+ // Update session offset after successful read
+ session.StartOffset = currentOffset
+
+ // CRITICAL: Cache the consumed records to avoid broker tight loop
+ // Append new records to cache (keep last 1000 records max for better hit rate)
+ session.consumedRecords = append(session.consumedRecords, records...)
+ if len(session.consumedRecords) > 1000 {
+ // Keep only the most recent 1000 records
+ session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-1000:]
+ }
+ glog.V(2).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords))
+
+ return records, nil
+}
+
+// CloseSubscriber closes and removes a subscriber session
+func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerGroup string, consumerID string) {
+ tempSession := &BrokerSubscriberSession{
+ Topic: topic,
+ Partition: partition,
+ ConsumerGroup: consumerGroup,
+ ConsumerID: consumerID,
+ }
+ key := tempSession.Key()
+
+ bc.subscribersLock.Lock()
+ defer bc.subscribersLock.Unlock()
+
+ if session, exists := bc.subscribers[key]; exists {
+ if session.Stream != nil {
+ _ = session.Stream.CloseSend()
+ }
+ if session.Cancel != nil {
+ session.Cancel()
+ }
+ delete(bc.subscribers, key)
+ glog.V(1).Infof("[FETCH] Closed subscriber for %s", key)
+ }
+}
+
+// NeedsRestart checks if the subscriber needs to restart to read from the given offset
+// Returns true if:
+// 1. Requested offset is before current position AND not in cache
+// 2. Stream is closed/invalid
+func (bc *BrokerClient) NeedsRestart(session *BrokerSubscriberSession, requestedOffset int64) bool {
+ session.mu.Lock()
+ defer session.mu.Unlock()
+
+ // Check if stream is still valid
+ if session.Stream == nil || session.Ctx == nil {
+ return true
+ }
+
+ // Check if we can serve from cache
+ if len(session.consumedRecords) > 0 {
+ cacheStart := session.consumedRecords[0].Offset
+ cacheEnd := session.consumedRecords[len(session.consumedRecords)-1].Offset
+ if requestedOffset >= cacheStart && requestedOffset <= cacheEnd {
+ // Can serve from cache, no restart needed
+ return false
+ }
+ }
+
+ // If requested offset is far behind current position, need restart
+ if requestedOffset < session.StartOffset {
+ return true
+ }
+
+ // Check if we're too far ahead (gap in cache)
+ if requestedOffset > session.StartOffset+1000 {
+ // Large gap - might be more efficient to restart
+ return true
+ }
+
+ return false
+}
+
+// RestartSubscriber restarts an existing subscriber from a new offset
+// This is more efficient than closing and recreating the session
+func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newOffset int64, consumerGroup string, consumerID string) error {
+ session.mu.Lock()
+ defer session.mu.Unlock()
+
+ glog.V(1).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d",
+ session.Topic, session.Partition, session.StartOffset, newOffset)
+
+ // Close existing stream
+ if session.Stream != nil {
+ _ = session.Stream.CloseSend()
+ }
+ if session.Cancel != nil {
+ session.Cancel()
+ }
+
+ // Clear cache since we're seeking to a different position
+ session.consumedRecords = nil
+ session.nextOffsetToRead = newOffset
+
+ // Create new stream from new offset
+ subscriberCtx, cancel := context.WithCancel(context.Background())
+
+ stream, err := bc.client.SubscribeMessage(subscriberCtx)
+ if err != nil {
+ cancel()
+ return fmt.Errorf("failed to create subscribe stream for restart: %v", err)
+ }
+
+ // Get the actual partition assignment
+ actualPartition, err := bc.getActualPartitionAssignment(session.Topic, session.Partition)
+ if err != nil {
+ cancel()
+ _ = stream.CloseSend()
+ return fmt.Errorf("failed to get actual partition assignment for restart: %v", err)
+ }
+
+ // Send init message with new offset
+ initReq := &mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Init{
+ Init: &mq_pb.SubscribeMessageRequest_InitMessage{
+ ConsumerGroup: consumerGroup,
+ ConsumerId: consumerID,
+ ClientId: "kafka-gateway",
+ Topic: &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: session.Topic,
+ },
+ PartitionOffset: &schema_pb.PartitionOffset{
+ Partition: actualPartition,
+ StartTsNs: 0,
+ StartOffset: newOffset,
+ },
+ OffsetType: schema_pb.OffsetType_EXACT_OFFSET,
+ SlidingWindowSize: 10,
+ },
+ },
+ }
+
+ if err := stream.Send(initReq); err != nil {
+ cancel()
+ _ = stream.CloseSend()
+ return fmt.Errorf("failed to send subscribe init for restart: %v", err)
+ }
+
+ // Update session with new stream and offset
+ session.Stream = stream
+ session.Cancel = cancel
+ session.Ctx = subscriberCtx
+ session.StartOffset = newOffset
+
+ glog.V(1).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d",
+ session.Topic, session.Partition, newOffset)
+
+ return nil
+}