diff options
Diffstat (limited to 'weed/mq/kafka/integration/broker_client_subscribe.go')
| -rw-r--r-- | weed/mq/kafka/integration/broker_client_subscribe.go | 703 |
1 files changed, 703 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go new file mode 100644 index 000000000..a0b8504bf --- /dev/null +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -0,0 +1,703 @@ +package integration + +import ( + "context" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// CreateFreshSubscriber creates a new subscriber session without caching +// This ensures each fetch gets fresh data from the requested offset +// consumerGroup and consumerID are passed from Kafka client for proper tracking in SMQ +func (bc *BrokerClient) CreateFreshSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { + // Create a dedicated context for this subscriber + subscriberCtx := context.Background() + + stream, err := bc.client.SubscribeMessage(subscriberCtx) + if err != nil { + return nil, fmt.Errorf("failed to create subscribe stream: %v", err) + } + + // Get the actual partition assignment from the broker + actualPartition, err := bc.getActualPartitionAssignment(topic, partition) + if err != nil { + return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) + } + + // Convert Kafka offset to SeaweedMQ OffsetType + var offsetType schema_pb.OffsetType + var startTimestamp int64 + var startOffsetValue int64 + + // Use EXACT_OFFSET to read from the specific offset + offsetType = schema_pb.OffsetType_EXACT_OFFSET + startTimestamp = 0 + startOffsetValue = startOffset + + // Send init message to start subscription with Kafka client's consumer group and ID + initReq := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: consumerGroup, + ConsumerId: consumerID, + ClientId: "kafka-gateway", + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + PartitionOffset: &schema_pb.PartitionOffset{ + Partition: actualPartition, + StartTsNs: startTimestamp, + StartOffset: startOffsetValue, + }, + OffsetType: offsetType, + SlidingWindowSize: 10, + }, + }, + } + + if err := stream.Send(initReq); err != nil { + return nil, fmt.Errorf("failed to send subscribe init: %v", err) + } + + // IMPORTANT: Don't wait for init response here! + // The broker may send the first data record as the "init response" + // If we call Recv() here, we'll consume that first record and ReadRecords will block + // waiting for the second record, causing a 30-second timeout. + // Instead, let ReadRecords handle all Recv() calls. + + session := &BrokerSubscriberSession{ + Stream: stream, + Topic: topic, + Partition: partition, + StartOffset: startOffset, + ConsumerGroup: consumerGroup, + ConsumerID: consumerID, + } + + return session, nil +} + +// GetOrCreateSubscriber gets or creates a subscriber for offset tracking +func (bc *BrokerClient) GetOrCreateSubscriber(topic string, partition int32, startOffset int64, consumerGroup string, consumerID string) (*BrokerSubscriberSession, error) { + // Create a temporary session to generate the key + tempSession := &BrokerSubscriberSession{ + Topic: topic, + Partition: partition, + ConsumerGroup: consumerGroup, + ConsumerID: consumerID, + } + key := tempSession.Key() + + bc.subscribersLock.RLock() + if session, exists := bc.subscribers[key]; exists { + // Check if we need to recreate the session + if session.StartOffset != startOffset { + // CRITICAL FIX: Check cache first before recreating + // If the requested offset is in cache, we can reuse the session + session.mu.Lock() + canUseCache := false + + if len(session.consumedRecords) > 0 { + cacheStartOffset := session.consumedRecords[0].Offset + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + if startOffset >= cacheStartOffset && startOffset <= cacheEndOffset { + canUseCache = true + glog.V(2).Infof("[FETCH] Session offset mismatch for %s (session=%d, requested=%d), but offset is in cache [%d-%d]", + key, session.StartOffset, startOffset, cacheStartOffset, cacheEndOffset) + } + } + + session.mu.Unlock() + + if canUseCache { + // Offset is in cache, reuse session + bc.subscribersLock.RUnlock() + return session, nil + } + + // Not in cache - need to recreate session at the requested offset + glog.V(0).Infof("[FETCH] Recreating session for %s: session at %d, requested %d (not in cache)", + key, session.StartOffset, startOffset) + bc.subscribersLock.RUnlock() + + // Close and delete the old session + bc.subscribersLock.Lock() + // CRITICAL: Double-check if another thread already recreated the session at the desired offset + // This prevents multiple concurrent threads from all trying to recreate the same session + if existingSession, exists := bc.subscribers[key]; exists { + existingSession.mu.Lock() + existingOffset := existingSession.StartOffset + existingSession.mu.Unlock() + + // Check if the session was already recreated at (or before) the requested offset + if existingOffset <= startOffset { + bc.subscribersLock.Unlock() + glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, startOffset) + // Re-acquire the existing session and continue + return existingSession, nil + } + + // Session still needs recreation - close it + if existingSession.Stream != nil { + _ = existingSession.Stream.CloseSend() + } + if existingSession.Cancel != nil { + existingSession.Cancel() + } + delete(bc.subscribers, key) + } + bc.subscribersLock.Unlock() + } else { + // Exact match - reuse + bc.subscribersLock.RUnlock() + return session, nil + } + } else { + bc.subscribersLock.RUnlock() + } + + // Create new subscriber stream + bc.subscribersLock.Lock() + defer bc.subscribersLock.Unlock() + + if session, exists := bc.subscribers[key]; exists { + return session, nil + } + + // CRITICAL FIX: Use background context for subscriber to prevent premature cancellation + // Subscribers need to continue reading data even when the connection is closing, + // otherwise Schema Registry and other clients can't read existing data. + // The subscriber will be cleaned up when the stream is explicitly closed. + subscriberCtx := context.Background() + subscriberCancel := func() {} // No-op cancel + + stream, err := bc.client.SubscribeMessage(subscriberCtx) + if err != nil { + return nil, fmt.Errorf("failed to create subscribe stream: %v", err) + } + + // Get the actual partition assignment from the broker instead of using Kafka partition mapping + actualPartition, err := bc.getActualPartitionAssignment(topic, partition) + if err != nil { + return nil, fmt.Errorf("failed to get actual partition assignment for subscribe: %v", err) + } + + // Convert Kafka offset to appropriate SeaweedMQ OffsetType and parameters + var offsetType schema_pb.OffsetType + var startTimestamp int64 + var startOffsetValue int64 + + if startOffset == -1 { + // Kafka offset -1 typically means "latest" + offsetType = schema_pb.OffsetType_RESET_TO_LATEST + startTimestamp = 0 // Not used with RESET_TO_LATEST + startOffsetValue = 0 // Not used with RESET_TO_LATEST + glog.V(1).Infof("Using RESET_TO_LATEST for Kafka offset -1 (read latest)") + } else { + // CRITICAL FIX: Use EXACT_OFFSET to position subscriber at the exact Kafka offset + // This allows the subscriber to read from both buffer and disk at the correct position + offsetType = schema_pb.OffsetType_EXACT_OFFSET + startTimestamp = 0 // Not used with EXACT_OFFSET + startOffsetValue = startOffset // Use the exact Kafka offset + glog.V(1).Infof("Using EXACT_OFFSET for Kafka offset %d (direct positioning)", startOffset) + } + + glog.V(1).Infof("Creating subscriber for topic=%s partition=%d: Kafka offset %d -> SeaweedMQ %s (timestamp=%d)", + topic, partition, startOffset, offsetType, startTimestamp) + + // Send init message using the actual partition structure that the broker allocated + if err := stream.Send(&mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: consumerGroup, + ConsumerId: consumerID, + ClientId: "kafka-gateway", + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + PartitionOffset: &schema_pb.PartitionOffset{ + Partition: actualPartition, + StartTsNs: startTimestamp, + StartOffset: startOffsetValue, + }, + OffsetType: offsetType, // Use the correct offset type + SlidingWindowSize: 10, + }, + }, + }); err != nil { + return nil, fmt.Errorf("failed to send subscribe init: %v", err) + } + + session := &BrokerSubscriberSession{ + Topic: topic, + Partition: partition, + Stream: stream, + StartOffset: startOffset, + ConsumerGroup: consumerGroup, + ConsumerID: consumerID, + Ctx: subscriberCtx, + Cancel: subscriberCancel, + } + + bc.subscribers[key] = session + glog.V(2).Infof("Created subscriber session for %s with context cancellation support", key) + return session, nil +} + +// ReadRecordsFromOffset reads records starting from a specific offset +// If the offset is in cache, returns cached records; otherwise delegates to ReadRecords +// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) +func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *BrokerSubscriberSession, requestedOffset int64, maxRecords int) ([]*SeaweedRecord, error) { + if session == nil { + return nil, fmt.Errorf("subscriber session cannot be nil") + } + + session.mu.Lock() + + glog.V(2).Infof("[FETCH] ReadRecordsFromOffset: topic=%s partition=%d requestedOffset=%d sessionOffset=%d maxRecords=%d", + session.Topic, session.Partition, requestedOffset, session.StartOffset, maxRecords) + + // Check cache first + if len(session.consumedRecords) > 0 { + cacheStartOffset := session.consumedRecords[0].Offset + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + + if requestedOffset >= cacheStartOffset && requestedOffset <= cacheEndOffset { + // Found in cache + startIdx := int(requestedOffset - cacheStartOffset) + endIdx := startIdx + maxRecords + if endIdx > len(session.consumedRecords) { + endIdx = len(session.consumedRecords) + } + glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset) + session.mu.Unlock() + return session.consumedRecords[startIdx:endIdx], nil + } + } + + // CRITICAL FIX for Schema Registry: Keep subscriber alive across multiple fetch requests + // Schema Registry expects to make multiple poll() calls on the same consumer connection + // + // Three scenarios: + // 1. requestedOffset < session.StartOffset: Need to seek backward (recreate) + // 2. requestedOffset == session.StartOffset: Continue reading (use existing) + // 3. requestedOffset > session.StartOffset: Continue reading forward (use existing) + // + // The session will naturally advance as records are consumed, so we should NOT + // recreate it just because requestedOffset != session.StartOffset + + if requestedOffset < session.StartOffset { + // Need to seek backward - close old session and create a fresh subscriber + // Restarting an existing stream doesn't work reliably because the broker may still + // have old data buffered in the stream pipeline + glog.V(0).Infof("[FETCH] Seeking backward: requested=%d < session=%d, creating fresh subscriber", + requestedOffset, session.StartOffset) + + // Extract session details before unlocking + topic := session.Topic + partition := session.Partition + consumerGroup := session.ConsumerGroup + consumerID := session.ConsumerID + key := session.Key() + session.mu.Unlock() + + // Close the old session completely + bc.subscribersLock.Lock() + // CRITICAL: Double-check if another thread already recreated the session at the desired offset + // This prevents multiple concurrent threads from all trying to recreate the same session + if existingSession, exists := bc.subscribers[key]; exists { + existingSession.mu.Lock() + existingOffset := existingSession.StartOffset + existingSession.mu.Unlock() + + // Check if the session was already recreated at (or before) the requested offset + if existingOffset <= requestedOffset { + bc.subscribersLock.Unlock() + glog.V(1).Infof("[FETCH] Session already recreated by another thread at offset %d (requested %d)", existingOffset, requestedOffset) + // Re-acquire the existing session and continue + return bc.ReadRecordsFromOffset(ctx, existingSession, requestedOffset, maxRecords) + } + + // Session still needs recreation - close it + if existingSession.Stream != nil { + _ = existingSession.Stream.CloseSend() + } + if existingSession.Cancel != nil { + existingSession.Cancel() + } + delete(bc.subscribers, key) + glog.V(1).Infof("[FETCH] Closed old subscriber session for backward seek: %s", key) + } + bc.subscribersLock.Unlock() + + // Create a completely fresh subscriber at the requested offset + newSession, err := bc.GetOrCreateSubscriber(topic, partition, requestedOffset, consumerGroup, consumerID) + if err != nil { + return nil, fmt.Errorf("failed to create fresh subscriber at offset %d: %w", requestedOffset, err) + } + + // Read from fresh subscriber + return bc.ReadRecords(ctx, newSession, maxRecords) + } + + // requestedOffset >= session.StartOffset: Keep reading forward from existing session + // This handles: + // - Exact match (requestedOffset == session.StartOffset) + // - Reading ahead (requestedOffset > session.StartOffset, e.g., from cache) + glog.V(2).Infof("[FETCH] Using persistent session: requested=%d session=%d (persistent connection)", + requestedOffset, session.StartOffset) + session.mu.Unlock() + return bc.ReadRecords(ctx, session, maxRecords) +} + +// ReadRecords reads available records from the subscriber stream +// Uses a timeout-based approach to read multiple records without blocking indefinitely +// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) +func (bc *BrokerClient) ReadRecords(ctx context.Context, session *BrokerSubscriberSession, maxRecords int) ([]*SeaweedRecord, error) { + if session == nil { + return nil, fmt.Errorf("subscriber session cannot be nil") + } + + if session.Stream == nil { + return nil, fmt.Errorf("subscriber session stream cannot be nil") + } + + // CRITICAL: Lock to prevent concurrent reads from the same stream + // Multiple Fetch requests may try to read from the same subscriber concurrently, + // causing the broker to return the same offset repeatedly + session.mu.Lock() + defer session.mu.Unlock() + + glog.V(2).Infof("[FETCH] ReadRecords: topic=%s partition=%d startOffset=%d maxRecords=%d", + session.Topic, session.Partition, session.StartOffset, maxRecords) + + var records []*SeaweedRecord + currentOffset := session.StartOffset + + // CRITICAL FIX: Return immediately if maxRecords is 0 or negative + if maxRecords <= 0 { + return records, nil + } + + // CRITICAL FIX: Use cached records if available to avoid broker tight loop + // If we've already consumed these records, return them from cache + if len(session.consumedRecords) > 0 { + cacheStartOffset := session.consumedRecords[0].Offset + cacheEndOffset := session.consumedRecords[len(session.consumedRecords)-1].Offset + + if currentOffset >= cacheStartOffset && currentOffset <= cacheEndOffset { + // Records are in cache + glog.V(2).Infof("[FETCH] Returning cached records: requested offset %d is in cache [%d-%d]", + currentOffset, cacheStartOffset, cacheEndOffset) + + // Find starting index in cache + startIdx := int(currentOffset - cacheStartOffset) + if startIdx < 0 || startIdx >= len(session.consumedRecords) { + glog.Errorf("[FETCH] Cache index out of bounds: startIdx=%d, cache size=%d", startIdx, len(session.consumedRecords)) + return records, nil + } + + // Return up to maxRecords from cache + endIdx := startIdx + maxRecords + if endIdx > len(session.consumedRecords) { + endIdx = len(session.consumedRecords) + } + + glog.V(2).Infof("[FETCH] Returning %d cached records from index %d to %d", endIdx-startIdx, startIdx, endIdx-1) + return session.consumedRecords[startIdx:endIdx], nil + } + } + + // Read first record with timeout (important for empty topics) + // CRITICAL: For SMQ backend with consumer groups, we need adequate timeout for disk reads + // When a consumer group resumes from a committed offset, the subscriber may need to: + // 1. Connect to the broker (network latency) + // 2. Seek to the correct offset in the log file (disk I/O) + // 3. Read and deserialize the record (disk I/O) + // Total latency can be 100-500ms for cold reads from disk + // + // CRITICAL: Use the context from the Kafka fetch request + // The context timeout is set by the caller based on the Kafka fetch request's MaxWaitTime + // This ensures we wait exactly as long as the client requested, not more or less + // For in-memory reads (hot path), records arrive in <10ms + // For low-volume topics (like _schemas), the caller sets longer timeout to keep subscriber alive + // If no context provided, use a reasonable default timeout + if ctx == nil { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + } + + type recvResult struct { + resp *mq_pb.SubscribeMessageResponse + err error + } + recvChan := make(chan recvResult, 1) + + // Try to receive first record + go func() { + resp, err := session.Stream.Recv() + select { + case recvChan <- recvResult{resp: resp, err: err}: + case <-ctx.Done(): + // Context cancelled, don't send (avoid blocking) + } + }() + + select { + case result := <-recvChan: + if result.err != nil { + glog.V(2).Infof("[FETCH] Stream.Recv() error on first record: %v", result.err) + return records, nil // Return empty - no error for empty topic + } + + if dataMsg := result.resp.GetData(); dataMsg != nil { + record := &SeaweedRecord{ + Key: dataMsg.Key, + Value: dataMsg.Value, + Timestamp: dataMsg.TsNs, + Offset: currentOffset, + } + records = append(records, record) + currentOffset++ + glog.V(4).Infof("[FETCH] Received record: offset=%d, keyLen=%d, valueLen=%d", + record.Offset, len(record.Key), len(record.Value)) + } + + case <-ctx.Done(): + // Timeout on first record - topic is empty or no data available + glog.V(4).Infof("[FETCH] No data available (timeout on first record)") + return records, nil + } + + // If we got the first record, try to get more with adaptive timeout + // CRITICAL: Schema Registry catch-up scenario - give generous timeout for the first batch + // Schema Registry needs to read multiple records quickly when catching up (e.g., offsets 3-6) + // The broker may be reading from disk, which introduces 10-20ms delay between records + // + // Strategy: Start with generous timeout (1 second) for first 5 records to allow broker + // to read from disk, then switch to fast mode (100ms) for streaming in-memory data + consecutiveReads := 0 + + for len(records) < maxRecords { + // Adaptive timeout based on how many records we've already read + var currentTimeout time.Duration + if consecutiveReads < 5 { + // First 5 records: generous timeout for disk reads + network delays + currentTimeout = 1 * time.Second + } else { + // After 5 records: assume we're streaming from memory, use faster timeout + currentTimeout = 100 * time.Millisecond + } + + readStart := time.Now() + ctx2, cancel2 := context.WithTimeout(context.Background(), currentTimeout) + recvChan2 := make(chan recvResult, 1) + + go func() { + resp, err := session.Stream.Recv() + select { + case recvChan2 <- recvResult{resp: resp, err: err}: + case <-ctx2.Done(): + // Context cancelled + } + }() + + select { + case result := <-recvChan2: + cancel2() + readDuration := time.Since(readStart) + + if result.err != nil { + glog.V(2).Infof("[FETCH] Stream.Recv() error after %d records: %v", len(records), result.err) + // Update session offset before returning + session.StartOffset = currentOffset + return records, nil + } + + if dataMsg := result.resp.GetData(); dataMsg != nil { + record := &SeaweedRecord{ + Key: dataMsg.Key, + Value: dataMsg.Value, + Timestamp: dataMsg.TsNs, + Offset: currentOffset, + } + records = append(records, record) + currentOffset++ + consecutiveReads++ // Track number of successful reads for adaptive timeout + + glog.V(4).Infof("[FETCH] Received record %d: offset=%d, keyLen=%d, valueLen=%d, readTime=%v", + len(records), record.Offset, len(record.Key), len(record.Value), readDuration) + } + + case <-ctx2.Done(): + cancel2() + // Timeout - return what we have + glog.V(4).Infof("[FETCH] Read timeout after %d records (waited %v), returning batch", len(records), time.Since(readStart)) + // CRITICAL: Update session offset so next fetch knows where we left off + session.StartOffset = currentOffset + return records, nil + } + } + + glog.V(2).Infof("[FETCH] ReadRecords returning %d records (maxRecords reached)", len(records)) + // Update session offset after successful read + session.StartOffset = currentOffset + + // CRITICAL: Cache the consumed records to avoid broker tight loop + // Append new records to cache (keep last 1000 records max for better hit rate) + session.consumedRecords = append(session.consumedRecords, records...) + if len(session.consumedRecords) > 1000 { + // Keep only the most recent 1000 records + session.consumedRecords = session.consumedRecords[len(session.consumedRecords)-1000:] + } + glog.V(2).Infof("[FETCH] Updated cache: now contains %d records", len(session.consumedRecords)) + + return records, nil +} + +// CloseSubscriber closes and removes a subscriber session +func (bc *BrokerClient) CloseSubscriber(topic string, partition int32, consumerGroup string, consumerID string) { + tempSession := &BrokerSubscriberSession{ + Topic: topic, + Partition: partition, + ConsumerGroup: consumerGroup, + ConsumerID: consumerID, + } + key := tempSession.Key() + + bc.subscribersLock.Lock() + defer bc.subscribersLock.Unlock() + + if session, exists := bc.subscribers[key]; exists { + if session.Stream != nil { + _ = session.Stream.CloseSend() + } + if session.Cancel != nil { + session.Cancel() + } + delete(bc.subscribers, key) + glog.V(1).Infof("[FETCH] Closed subscriber for %s", key) + } +} + +// NeedsRestart checks if the subscriber needs to restart to read from the given offset +// Returns true if: +// 1. Requested offset is before current position AND not in cache +// 2. Stream is closed/invalid +func (bc *BrokerClient) NeedsRestart(session *BrokerSubscriberSession, requestedOffset int64) bool { + session.mu.Lock() + defer session.mu.Unlock() + + // Check if stream is still valid + if session.Stream == nil || session.Ctx == nil { + return true + } + + // Check if we can serve from cache + if len(session.consumedRecords) > 0 { + cacheStart := session.consumedRecords[0].Offset + cacheEnd := session.consumedRecords[len(session.consumedRecords)-1].Offset + if requestedOffset >= cacheStart && requestedOffset <= cacheEnd { + // Can serve from cache, no restart needed + return false + } + } + + // If requested offset is far behind current position, need restart + if requestedOffset < session.StartOffset { + return true + } + + // Check if we're too far ahead (gap in cache) + if requestedOffset > session.StartOffset+1000 { + // Large gap - might be more efficient to restart + return true + } + + return false +} + +// RestartSubscriber restarts an existing subscriber from a new offset +// This is more efficient than closing and recreating the session +func (bc *BrokerClient) RestartSubscriber(session *BrokerSubscriberSession, newOffset int64, consumerGroup string, consumerID string) error { + session.mu.Lock() + defer session.mu.Unlock() + + glog.V(1).Infof("[FETCH] Restarting subscriber for %s[%d]: from offset %d to %d", + session.Topic, session.Partition, session.StartOffset, newOffset) + + // Close existing stream + if session.Stream != nil { + _ = session.Stream.CloseSend() + } + if session.Cancel != nil { + session.Cancel() + } + + // Clear cache since we're seeking to a different position + session.consumedRecords = nil + session.nextOffsetToRead = newOffset + + // Create new stream from new offset + subscriberCtx, cancel := context.WithCancel(context.Background()) + + stream, err := bc.client.SubscribeMessage(subscriberCtx) + if err != nil { + cancel() + return fmt.Errorf("failed to create subscribe stream for restart: %v", err) + } + + // Get the actual partition assignment + actualPartition, err := bc.getActualPartitionAssignment(session.Topic, session.Partition) + if err != nil { + cancel() + _ = stream.CloseSend() + return fmt.Errorf("failed to get actual partition assignment for restart: %v", err) + } + + // Send init message with new offset + initReq := &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ + ConsumerGroup: consumerGroup, + ConsumerId: consumerID, + ClientId: "kafka-gateway", + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: session.Topic, + }, + PartitionOffset: &schema_pb.PartitionOffset{ + Partition: actualPartition, + StartTsNs: 0, + StartOffset: newOffset, + }, + OffsetType: schema_pb.OffsetType_EXACT_OFFSET, + SlidingWindowSize: 10, + }, + }, + } + + if err := stream.Send(initReq); err != nil { + cancel() + _ = stream.CloseSend() + return fmt.Errorf("failed to send subscribe init for restart: %v", err) + } + + // Update session with new stream and offset + session.Stream = stream + session.Cancel = cancel + session.Ctx = subscriberCtx + session.StartOffset = newOffset + + glog.V(1).Infof("[FETCH] Successfully restarted subscriber for %s[%d] at offset %d", + session.Topic, session.Partition, newOffset) + + return nil +} |
