diff options
Diffstat (limited to 'weed/mq/kafka/integration/seaweedmq_handler.go')
| -rw-r--r-- | weed/mq/kafka/integration/seaweedmq_handler.go | 526 |
1 files changed, 526 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go new file mode 100644 index 000000000..7689d0612 --- /dev/null +++ b/weed/mq/kafka/integration/seaweedmq_handler.go @@ -0,0 +1,526 @@ +package integration + +import ( + "context" + "encoding/binary" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// GetStoredRecords retrieves records from SeaweedMQ using the proper subscriber API +// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime) +func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]SMQRecord, error) { + glog.V(2).Infof("[FETCH] GetStoredRecords: topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords) + + // Verify topic exists + if !h.TopicExists(topic) { + return nil, fmt.Errorf("topic %s does not exist", topic) + } + + // CRITICAL: Use per-connection BrokerClient to prevent gRPC stream interference + // Each Kafka connection has its own isolated BrokerClient instance + var brokerClient *BrokerClient + consumerGroup := "kafka-fetch-consumer" // default + // CRITICAL FIX: Use stable consumer ID per topic-partition, NOT with timestamp + // Including timestamp would create a new session on every fetch, causing subscriber churn + consumerID := fmt.Sprintf("kafka-fetch-%s-%d", topic, partition) // default, stable per topic-partition + + // Get the per-connection broker client from connection context + if h.protocolHandler != nil { + connCtx := h.protocolHandler.GetConnectionContext() + if connCtx != nil { + // Extract per-connection broker client + if connCtx.BrokerClient != nil { + if bc, ok := connCtx.BrokerClient.(*BrokerClient); ok { + brokerClient = bc + glog.V(2).Infof("[FETCH] Using per-connection BrokerClient for topic=%s partition=%d", topic, partition) + } + } + + // Extract consumer group and client ID + if connCtx.ConsumerGroup != "" { + consumerGroup = connCtx.ConsumerGroup + glog.V(2).Infof("[FETCH] Using actual consumer group from context: %s", consumerGroup) + } + if connCtx.MemberID != "" { + // Use member ID as base, but still include topic-partition for uniqueness + consumerID = fmt.Sprintf("%s-%s-%d", connCtx.MemberID, topic, partition) + glog.V(2).Infof("[FETCH] Using actual member ID from context: %s", consumerID) + } else if connCtx.ClientID != "" { + // Fallback to client ID if member ID not set (for clients not using consumer groups) + // Include topic-partition to ensure each partition consumer is unique + consumerID = fmt.Sprintf("%s-%s-%d", connCtx.ClientID, topic, partition) + glog.V(2).Infof("[FETCH] Using client ID from context: %s", consumerID) + } + } + } + + // Fallback to shared broker client if per-connection client not available + if brokerClient == nil { + glog.Warningf("[FETCH] No per-connection BrokerClient, falling back to shared client") + brokerClient = h.brokerClient + if brokerClient == nil { + return nil, fmt.Errorf("no broker client available") + } + } + + // CRITICAL FIX: Reuse existing subscriber if offset matches to avoid concurrent subscriber storm + // Creating too many concurrent subscribers to the same offset causes the broker to return + // the same data repeatedly, creating an infinite loop. + glog.V(2).Infof("[FETCH] Getting or creating subscriber for topic=%s partition=%d fromOffset=%d", topic, partition, fromOffset) + + // GetOrCreateSubscriber handles offset mismatches internally + // If the cached subscriber is at a different offset, it will be recreated automatically + brokerSubscriber, err := brokerClient.GetOrCreateSubscriber(topic, partition, fromOffset, consumerGroup, consumerID) + if err != nil { + glog.Errorf("[FETCH] Failed to get/create subscriber: %v", err) + return nil, fmt.Errorf("failed to get/create subscriber: %v", err) + } + glog.V(2).Infof("[FETCH] Subscriber ready at offset %d", brokerSubscriber.StartOffset) + + // NOTE: We DON'T close the subscriber here because we're reusing it across Fetch requests + // The subscriber will be closed when the connection closes or when a different offset is requested + + // Read records using the subscriber + // CRITICAL: Pass the requested fromOffset to ReadRecords so it can check the cache correctly + // If the session has advanced past fromOffset, ReadRecords will return cached data + // Pass context to respect Kafka fetch request's MaxWaitTime + glog.V(2).Infof("[FETCH] Calling ReadRecords for topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords) + seaweedRecords, err := brokerClient.ReadRecordsFromOffset(ctx, brokerSubscriber, fromOffset, maxRecords) + if err != nil { + glog.Errorf("[FETCH] ReadRecords failed: %v", err) + return nil, fmt.Errorf("failed to read records: %v", err) + } + // CRITICAL FIX: If ReadRecords returns 0 but HWM indicates data exists on disk, force a disk read + // This handles the case where subscriber advanced past data that was already on disk + // Only do this ONCE per fetch request to avoid subscriber churn + if len(seaweedRecords) == 0 { + hwm, hwmErr := brokerClient.GetHighWaterMark(topic, partition) + if hwmErr == nil && fromOffset < hwm { + // Restart the existing subscriber at the requested offset for disk read + // This is more efficient than closing and recreating + consumerGroup := "kafka-gateway" + consumerID := fmt.Sprintf("kafka-gateway-%s-%d", topic, partition) + + if err := brokerClient.RestartSubscriber(brokerSubscriber, fromOffset, consumerGroup, consumerID); err != nil { + return nil, fmt.Errorf("failed to restart subscriber: %v", err) + } + + // Try reading again from restarted subscriber (will do disk read) + seaweedRecords, err = brokerClient.ReadRecordsFromOffset(ctx, brokerSubscriber, fromOffset, maxRecords) + if err != nil { + return nil, fmt.Errorf("failed to read after restart: %v", err) + } + } + } + + glog.V(2).Infof("[FETCH] ReadRecords returned %d records", len(seaweedRecords)) + // + // This approach is correct for Kafka protocol: + // - Clients continuously poll with Fetch requests + // - If no data is available, we return empty and client will retry + // - Eventually the data will be read from disk and returned + // + // We only recreate subscriber if the offset mismatches, which is handled earlier in this function + + // Convert SeaweedMQ records to SMQRecord interface with proper Kafka offsets + smqRecords := make([]SMQRecord, 0, len(seaweedRecords)) + for i, seaweedRecord := range seaweedRecords { + // CRITICAL FIX: Use the actual offset from SeaweedMQ + // The SeaweedRecord.Offset field now contains the correct offset from the subscriber + kafkaOffset := seaweedRecord.Offset + + // CRITICAL: Skip records before the requested offset + // This can happen when the subscriber cache returns old data + if kafkaOffset < fromOffset { + glog.V(2).Infof("[FETCH] Skipping record %d with offset %d (requested fromOffset=%d)", i, kafkaOffset, fromOffset) + continue + } + + smqRecord := &SeaweedSMQRecord{ + key: seaweedRecord.Key, + value: seaweedRecord.Value, + timestamp: seaweedRecord.Timestamp, + offset: kafkaOffset, + } + smqRecords = append(smqRecords, smqRecord) + + glog.V(4).Infof("[FETCH] Record %d: offset=%d, keyLen=%d, valueLen=%d", i, kafkaOffset, len(seaweedRecord.Key), len(seaweedRecord.Value)) + } + + glog.V(2).Infof("[FETCH] Successfully read %d records from SMQ", len(smqRecords)) + return smqRecords, nil +} + +// GetEarliestOffset returns the earliest available offset for a topic partition +// ALWAYS queries SMQ broker directly - no ledger involved +func (h *SeaweedMQHandler) GetEarliestOffset(topic string, partition int32) (int64, error) { + + // Check if topic exists + if !h.TopicExists(topic) { + return 0, nil // Empty topic starts at offset 0 + } + + // ALWAYS query SMQ broker directly for earliest offset + if h.brokerClient != nil { + earliestOffset, err := h.brokerClient.GetEarliestOffset(topic, partition) + if err != nil { + return 0, err + } + return earliestOffset, nil + } + + // No broker client - this shouldn't happen in production + return 0, fmt.Errorf("broker client not available") +} + +// GetLatestOffset returns the latest available offset for a topic partition +// ALWAYS queries SMQ broker directly - no ledger involved +func (h *SeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64, error) { + // Check if topic exists + if !h.TopicExists(topic) { + return 0, nil // Empty topic + } + + // Check cache first + cacheKey := fmt.Sprintf("%s:%d", topic, partition) + h.hwmCacheMu.RLock() + if entry, exists := h.hwmCache[cacheKey]; exists { + if time.Now().Before(entry.expiresAt) { + // Cache hit - return cached value + h.hwmCacheMu.RUnlock() + return entry.value, nil + } + } + h.hwmCacheMu.RUnlock() + + // Cache miss or expired - query SMQ broker + if h.brokerClient != nil { + latestOffset, err := h.brokerClient.GetHighWaterMark(topic, partition) + if err != nil { + return 0, err + } + + // Update cache + h.hwmCacheMu.Lock() + h.hwmCache[cacheKey] = &hwmCacheEntry{ + value: latestOffset, + expiresAt: time.Now().Add(h.hwmCacheTTL), + } + h.hwmCacheMu.Unlock() + + return latestOffset, nil + } + + // No broker client - this shouldn't happen in production + return 0, fmt.Errorf("broker client not available") +} + +// WithFilerClient executes a function with a filer client +func (h *SeaweedMQHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error { + if h.brokerClient == nil { + return fmt.Errorf("no broker client available") + } + return h.brokerClient.WithFilerClient(streamingMode, fn) +} + +// GetFilerAddress returns the filer address used by this handler +func (h *SeaweedMQHandler) GetFilerAddress() string { + if h.brokerClient != nil { + return h.brokerClient.GetFilerAddress() + } + return "" +} + +// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset +func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) { + if len(key) > 0 { + } + if len(value) > 0 { + } else { + } + + // Verify topic exists + if !h.TopicExists(topic) { + return 0, fmt.Errorf("topic %s does not exist", topic) + } + + // Get current timestamp + timestamp := time.Now().UnixNano() + + // Publish to SeaweedMQ and let SMQ generate the offset + var smqOffset int64 + var publishErr error + if h.brokerClient == nil { + publishErr = fmt.Errorf("no broker client available") + } else { + smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp) + } + + if publishErr != nil { + return 0, fmt.Errorf("failed to publish to SeaweedMQ: %v", publishErr) + } + + // SMQ should have generated and returned the offset - use it directly as the Kafka offset + + // Invalidate HWM cache for this partition to ensure fresh reads + // This is critical for read-your-own-write scenarios (e.g., Schema Registry) + cacheKey := fmt.Sprintf("%s:%d", topic, partition) + h.hwmCacheMu.Lock() + delete(h.hwmCache, cacheKey) + h.hwmCacheMu.Unlock() + + return smqOffset, nil +} + +// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ +// ALWAYS uses broker's assigned offset - no ledger involved +func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) { + // Verify topic exists + if !h.TopicExists(topic) { + return 0, fmt.Errorf("topic %s does not exist", topic) + } + + // Get current timestamp + timestamp := time.Now().UnixNano() + + // Publish RecordValue to SeaweedMQ and get the broker-assigned offset + var smqOffset int64 + var publishErr error + if h.brokerClient == nil { + publishErr = fmt.Errorf("no broker client available") + } else { + smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp) + } + + if publishErr != nil { + return 0, fmt.Errorf("failed to publish RecordValue to SeaweedMQ: %v", publishErr) + } + + // SMQ broker has assigned the offset - use it directly as the Kafka offset + + // Invalidate HWM cache for this partition to ensure fresh reads + // This is critical for read-your-own-write scenarios (e.g., Schema Registry) + cacheKey := fmt.Sprintf("%s:%d", topic, partition) + h.hwmCacheMu.Lock() + delete(h.hwmCache, cacheKey) + h.hwmCacheMu.Unlock() + + return smqOffset, nil +} + +// Ledger methods removed - SMQ broker handles all offset management directly + +// FetchRecords DEPRECATED - only used in old tests +func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffset int64, maxBytes int32) ([]byte, error) { + // Verify topic exists + if !h.TopicExists(topic) { + return nil, fmt.Errorf("topic %s does not exist", topic) + } + + // DEPRECATED: This function only used in old tests + // Get HWM directly from broker + highWaterMark, err := h.GetLatestOffset(topic, partition) + if err != nil { + return nil, err + } + + // If fetch offset is at or beyond high water mark, no records to return + if fetchOffset >= highWaterMark { + return []byte{}, nil + } + + // Get or create subscriber session for this topic/partition + var seaweedRecords []*SeaweedRecord + + // Calculate how many records to fetch + recordsToFetch := int(highWaterMark - fetchOffset) + if recordsToFetch > 100 { + recordsToFetch = 100 // Limit batch size + } + + // Read records using broker client + if h.brokerClient == nil { + return nil, fmt.Errorf("no broker client available") + } + // Use default consumer group/ID since this is a deprecated function + brokerSubscriber, subErr := h.brokerClient.GetOrCreateSubscriber(topic, partition, fetchOffset, "deprecated-consumer-group", "deprecated-consumer") + if subErr != nil { + return nil, fmt.Errorf("failed to get broker subscriber: %v", subErr) + } + // This is a deprecated function, use background context + seaweedRecords, err = h.brokerClient.ReadRecords(context.Background(), brokerSubscriber, recordsToFetch) + + if err != nil { + // If no records available, return empty batch instead of error + return []byte{}, nil + } + + // Map SeaweedMQ records to Kafka offsets and update ledger + kafkaRecords, err := h.mapSeaweedToKafkaOffsets(topic, partition, seaweedRecords, fetchOffset) + if err != nil { + return nil, fmt.Errorf("failed to map offsets: %v", err) + } + + // Convert mapped records to Kafka record batch format + return h.convertSeaweedToKafkaRecordBatch(kafkaRecords, fetchOffset, maxBytes) +} + +// mapSeaweedToKafkaOffsets maps SeaweedMQ records to proper Kafka offsets +func (h *SeaweedMQHandler) mapSeaweedToKafkaOffsets(topic string, partition int32, seaweedRecords []*SeaweedRecord, startOffset int64) ([]*SeaweedRecord, error) { + if len(seaweedRecords) == 0 { + return seaweedRecords, nil + } + + // DEPRECATED: This function only used in old tests + // Just map offsets sequentially + mappedRecords := make([]*SeaweedRecord, 0, len(seaweedRecords)) + + for i, seaweedRecord := range seaweedRecords { + currentKafkaOffset := startOffset + int64(i) + + // Create a copy of the record with proper Kafka offset assignment + mappedRecord := &SeaweedRecord{ + Key: seaweedRecord.Key, + Value: seaweedRecord.Value, + Timestamp: seaweedRecord.Timestamp, + Offset: currentKafkaOffset, + } + + // Just skip any error handling since this is deprecated + { + // Log warning but continue processing + } + + mappedRecords = append(mappedRecords, mappedRecord) + } + + return mappedRecords, nil +} + +// convertSeaweedToKafkaRecordBatch converts SeaweedMQ records to Kafka record batch format +func (h *SeaweedMQHandler) convertSeaweedToKafkaRecordBatch(seaweedRecords []*SeaweedRecord, fetchOffset int64, maxBytes int32) ([]byte, error) { + if len(seaweedRecords) == 0 { + return []byte{}, nil + } + + batch := make([]byte, 0, 512) + + // Record batch header + baseOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(baseOffsetBytes, uint64(fetchOffset)) + batch = append(batch, baseOffsetBytes...) // base offset + + // Batch length (placeholder, will be filled at end) + batchLengthPos := len(batch) + batch = append(batch, 0, 0, 0, 0) + + batch = append(batch, 0, 0, 0, 0) // partition leader epoch + batch = append(batch, 2) // magic byte (version 2) + + // CRC placeholder + batch = append(batch, 0, 0, 0, 0) + + // Batch attributes + batch = append(batch, 0, 0) + + // Last offset delta + lastOffsetDelta := uint32(len(seaweedRecords) - 1) + lastOffsetDeltaBytes := make([]byte, 4) + binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) + batch = append(batch, lastOffsetDeltaBytes...) + + // Timestamps - use actual timestamps from SeaweedMQ records + var firstTimestamp, maxTimestamp int64 + if len(seaweedRecords) > 0 { + firstTimestamp = seaweedRecords[0].Timestamp + maxTimestamp = firstTimestamp + for _, record := range seaweedRecords { + if record.Timestamp > maxTimestamp { + maxTimestamp = record.Timestamp + } + } + } + + firstTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(firstTimestampBytes, uint64(firstTimestamp)) + batch = append(batch, firstTimestampBytes...) + + maxTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp)) + batch = append(batch, maxTimestampBytes...) + + // Producer info (simplified) + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) // producer ID (-1) + batch = append(batch, 0xFF, 0xFF) // producer epoch (-1) + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) // base sequence (-1) + + // Record count + recordCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(recordCountBytes, uint32(len(seaweedRecords))) + batch = append(batch, recordCountBytes...) + + // Add actual records from SeaweedMQ + for i, seaweedRecord := range seaweedRecords { + record := h.convertSingleSeaweedRecord(seaweedRecord, int64(i), fetchOffset) + recordLength := byte(len(record)) + batch = append(batch, recordLength) + batch = append(batch, record...) + + // Check if we're approaching maxBytes limit + if int32(len(batch)) > maxBytes*3/4 { + // Leave room for remaining headers and stop adding records + break + } + } + + // Fill in the batch length + batchLength := uint32(len(batch) - batchLengthPos - 4) + binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) + + return batch, nil +} + +// convertSingleSeaweedRecord converts a single SeaweedMQ record to Kafka format +func (h *SeaweedMQHandler) convertSingleSeaweedRecord(seaweedRecord *SeaweedRecord, index, baseOffset int64) []byte { + record := make([]byte, 0, 64) + + // Record attributes + record = append(record, 0) + + // Timestamp delta (varint - simplified) + timestampDelta := seaweedRecord.Timestamp - baseOffset // Simple delta calculation + if timestampDelta < 0 { + timestampDelta = 0 + } + record = append(record, byte(timestampDelta&0xFF)) // Simplified varint encoding + + // Offset delta (varint - simplified) + record = append(record, byte(index)) + + // Key length and key + if len(seaweedRecord.Key) > 0 { + record = append(record, byte(len(seaweedRecord.Key))) + record = append(record, seaweedRecord.Key...) + } else { + // Null key + record = append(record, 0xFF) + } + + // Value length and value + if len(seaweedRecord.Value) > 0 { + record = append(record, byte(len(seaweedRecord.Value))) + record = append(record, seaweedRecord.Value...) + } else { + // Empty value + record = append(record, 0) + } + + // Headers count (0) + record = append(record, 0) + + return record +} |
