aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/fetch.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/fetch.go')
-rw-r--r--weed/mq/kafka/protocol/fetch.go651
1 files changed, 95 insertions, 556 deletions
diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go
index edc07d57a..6b38a71e1 100644
--- a/weed/mq/kafka/protocol/fetch.go
+++ b/weed/mq/kafka/protocol/fetch.go
@@ -7,6 +7,7 @@ import (
"hash/crc32"
"strings"
"time"
+ "unicode/utf8"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
@@ -97,11 +98,16 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
// Continue with polling
}
if hasDataAvailable() {
+ // Data became available during polling - return immediately with NO throttle
+ // Throttle time should only be used for quota enforcement, not for long-poll timing
+ throttleTimeMs = 0
break pollLoop
}
}
- elapsed := time.Since(start)
- throttleTimeMs = int32(elapsed / time.Millisecond)
+ // If we got here without breaking early, we hit the timeout
+ // Long-poll timeout is NOT throttling - throttle time should only be used for quota/rate limiting
+ // Do NOT set throttle time based on long-poll duration
+ throttleTimeMs = 0
}
// Build the response
@@ -155,7 +161,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
return nil, fmt.Errorf("connection context not available")
}
- glog.V(2).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions",
+ glog.V(4).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions",
connContext.ConnectionID, correlationID, len(fetchRequest.Topics),
func() int {
count := 0
@@ -166,7 +172,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}())
// Collect results from persistent readers
- // CRITICAL: Dispatch all requests concurrently, then wait for all results in parallel
+ // Dispatch all requests concurrently, then wait for all results in parallel
// to avoid sequential timeout accumulation
type pendingFetch struct {
topicName string
@@ -242,9 +248,19 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
}
// Phase 2: Wait for all results with adequate timeout for CI environments
- // CRITICAL: We MUST return a result for every requested partition or Sarama will error
+ // We MUST return a result for every requested partition or Sarama will error
results := make([]*partitionFetchResult, len(pending))
- deadline := time.After(500 * time.Millisecond) // 500ms for all partitions (increased for CI disk I/O)
+ // Use 95% of client's MaxWaitTime to ensure we return BEFORE client timeout
+ // This maximizes data collection time while leaving a safety buffer for:
+ // - Response serialization, network transmission, client processing
+ // For 500ms client timeout: 475ms internal fetch, 25ms buffer
+ // For 100ms client timeout: 95ms internal fetch, 5ms buffer
+ effectiveDeadlineMs := time.Duration(maxWaitMs) * 95 / 100
+ deadline := time.After(effectiveDeadlineMs * time.Millisecond)
+ if maxWaitMs < 20 {
+ // For very short timeouts (< 20ms), use full timeout to maximize data collection
+ deadline = time.After(time.Duration(maxWaitMs) * time.Millisecond)
+ }
// Collect results one by one with shared deadline
for i, pf := range pending {
@@ -256,7 +272,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
for j := i; j < len(pending); j++ {
results[j] = &partitionFetchResult{}
}
- glog.V(1).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions",
+ glog.V(3).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions",
connContext.ConnectionID, len(pending)-i)
goto done
case <-ctx.Done():
@@ -276,7 +292,7 @@ done:
// Now assemble the response in the correct order using fetched results
// ====================================================================
- // CRITICAL: Verify we have results for all requested partitions
+ // Verify we have results for all requested partitions
// Sarama requires a response block for EVERY requested partition to avoid ErrIncompleteResponse
expectedResultCount := 0
for _, topic := range fetchRequest.Topics {
@@ -861,373 +877,12 @@ func encodeVarint(value int64) []byte {
return buf
}
-// reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue
-func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) {
- // Only reconstruct if schema management is enabled
- if !h.IsSchemaEnabled() {
- return nil, fmt.Errorf("schema management not enabled")
- }
-
- // Extract schema information from metadata
- schemaIDStr, exists := metadata["schema_id"]
- if !exists {
- return nil, fmt.Errorf("no schema ID in metadata")
- }
-
- var schemaID uint32
- if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil {
- return nil, fmt.Errorf("invalid schema ID: %w", err)
- }
-
- formatStr, exists := metadata["schema_format"]
- if !exists {
- return nil, fmt.Errorf("no schema format in metadata")
- }
-
- var format schema.Format
- switch formatStr {
- case "AVRO":
- format = schema.FormatAvro
- case "PROTOBUF":
- format = schema.FormatProtobuf
- case "JSON_SCHEMA":
- format = schema.FormatJSONSchema
- default:
- return nil, fmt.Errorf("unsupported schema format: %s", formatStr)
- }
-
- // Use schema manager to encode back to original format
- return h.schemaManager.EncodeMessage(recordValue, schemaID, format)
-}
-
// SchematizedRecord holds both key and value for schematized messages
type SchematizedRecord struct {
Key []byte
Value []byte
}
-// fetchSchematizedRecords fetches and reconstructs schematized records from SeaweedMQ
-func (h *Handler) fetchSchematizedRecords(topicName string, partitionID int32, offset int64, maxBytes int32) ([]*SchematizedRecord, error) {
- glog.Infof("fetchSchematizedRecords: topic=%s partition=%d offset=%d maxBytes=%d", topicName, partitionID, offset, maxBytes)
-
- // Only proceed when schema feature is toggled on
- if !h.useSchema {
- glog.Infof("fetchSchematizedRecords EARLY RETURN: useSchema=false")
- return []*SchematizedRecord{}, nil
- }
-
- // Check if SeaweedMQ handler is available when schema feature is in use
- if h.seaweedMQHandler == nil {
- glog.Infof("fetchSchematizedRecords ERROR: seaweedMQHandler is nil")
- return nil, fmt.Errorf("SeaweedMQ handler not available")
- }
-
- // If schema management isn't fully configured, return empty instead of error
- if !h.IsSchemaEnabled() {
- glog.Infof("fetchSchematizedRecords EARLY RETURN: IsSchemaEnabled()=false")
- return []*SchematizedRecord{}, nil
- }
-
- // Fetch stored records from SeaweedMQ
- maxRecords := 100 // Reasonable batch size limit
- glog.Infof("fetchSchematizedRecords: calling GetStoredRecords maxRecords=%d", maxRecords)
- smqRecords, err := h.seaweedMQHandler.GetStoredRecords(context.Background(), topicName, partitionID, offset, maxRecords)
- if err != nil {
- glog.Infof("fetchSchematizedRecords ERROR: GetStoredRecords failed: %v", err)
- return nil, fmt.Errorf("failed to fetch SMQ records: %w", err)
- }
-
- glog.Infof("fetchSchematizedRecords: GetStoredRecords returned %d records", len(smqRecords))
- if len(smqRecords) == 0 {
- return []*SchematizedRecord{}, nil
- }
-
- var reconstructedRecords []*SchematizedRecord
- totalBytes := int32(0)
-
- for _, smqRecord := range smqRecords {
- // Check if we've exceeded maxBytes limit
- if maxBytes > 0 && totalBytes >= maxBytes {
- break
- }
-
- // Try to reconstruct the schematized message value
- reconstructedValue, err := h.reconstructSchematizedMessageFromSMQ(smqRecord)
- if err != nil {
- // Log error but continue with other messages
- Error("Failed to reconstruct schematized message at offset %d: %v", smqRecord.GetOffset(), err)
- continue
- }
-
- if reconstructedValue != nil {
- // Create SchematizedRecord with both key and reconstructed value
- record := &SchematizedRecord{
- Key: smqRecord.GetKey(), // Preserve the original key
- Value: reconstructedValue, // Use the reconstructed value
- }
- reconstructedRecords = append(reconstructedRecords, record)
- totalBytes += int32(len(record.Key) + len(record.Value))
- }
- }
-
- return reconstructedRecords, nil
-}
-
-// reconstructSchematizedMessageFromSMQ reconstructs a schematized message from an SMQRecord
-func (h *Handler) reconstructSchematizedMessageFromSMQ(smqRecord integration.SMQRecord) ([]byte, error) {
- // Get the stored value (should be a serialized RecordValue)
- valueBytes := smqRecord.GetValue()
- if len(valueBytes) == 0 {
- return nil, fmt.Errorf("empty value in SMQ record")
- }
-
- // Try to unmarshal as RecordValue
- recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(valueBytes, recordValue); err != nil {
- // If it's not a RecordValue, it might be a regular Kafka message
- // Return it as-is (non-schematized)
- return valueBytes, nil
- }
-
- // Extract schema metadata from the RecordValue fields
- metadata := h.extractSchemaMetadataFromRecord(recordValue)
- if len(metadata) == 0 {
- // No schema metadata found, treat as regular message
- return valueBytes, nil
- }
-
- // Remove Kafka metadata fields to get the original message content
- originalRecord := h.removeKafkaMetadataFields(recordValue)
-
- // Reconstruct the original Confluent envelope
- return h.reconstructSchematizedMessage(originalRecord, metadata)
-}
-
-// extractSchemaMetadataFromRecord extracts schema metadata from RecordValue fields
-func (h *Handler) extractSchemaMetadataFromRecord(recordValue *schema_pb.RecordValue) map[string]string {
- metadata := make(map[string]string)
-
- // Look for schema metadata fields in the record
- if schemaIDField := recordValue.Fields["_schema_id"]; schemaIDField != nil {
- if schemaIDValue := schemaIDField.GetStringValue(); schemaIDValue != "" {
- metadata["schema_id"] = schemaIDValue
- }
- }
-
- if schemaFormatField := recordValue.Fields["_schema_format"]; schemaFormatField != nil {
- if schemaFormatValue := schemaFormatField.GetStringValue(); schemaFormatValue != "" {
- metadata["schema_format"] = schemaFormatValue
- }
- }
-
- if schemaSubjectField := recordValue.Fields["_schema_subject"]; schemaSubjectField != nil {
- if schemaSubjectValue := schemaSubjectField.GetStringValue(); schemaSubjectValue != "" {
- metadata["schema_subject"] = schemaSubjectValue
- }
- }
-
- if schemaVersionField := recordValue.Fields["_schema_version"]; schemaVersionField != nil {
- if schemaVersionValue := schemaVersionField.GetStringValue(); schemaVersionValue != "" {
- metadata["schema_version"] = schemaVersionValue
- }
- }
-
- return metadata
-}
-
-// removeKafkaMetadataFields removes Kafka and schema metadata fields from RecordValue
-func (h *Handler) removeKafkaMetadataFields(recordValue *schema_pb.RecordValue) *schema_pb.RecordValue {
- originalRecord := &schema_pb.RecordValue{
- Fields: make(map[string]*schema_pb.Value),
- }
-
- // Copy all fields except metadata fields
- for key, value := range recordValue.Fields {
- if !h.isMetadataField(key) {
- originalRecord.Fields[key] = value
- }
- }
-
- return originalRecord
-}
-
-// isMetadataField checks if a field is a metadata field that should be excluded from the original message
-func (h *Handler) isMetadataField(fieldName string) bool {
- return fieldName == "_kafka_offset" ||
- fieldName == "_kafka_partition" ||
- fieldName == "_kafka_timestamp" ||
- fieldName == "_schema_id" ||
- fieldName == "_schema_format" ||
- fieldName == "_schema_subject" ||
- fieldName == "_schema_version"
-}
-
-// createSchematizedRecordBatch creates a Kafka record batch from reconstructed schematized messages
-func (h *Handler) createSchematizedRecordBatch(records []*SchematizedRecord, baseOffset int64) []byte {
- if len(records) == 0 {
- // Return empty record batch
- return h.createEmptyRecordBatch(baseOffset)
- }
-
- // Create individual record entries for the batch
- var recordsData []byte
- currentTimestamp := time.Now().UnixMilli()
-
- for i, record := range records {
- // Create a record entry (Kafka record format v2) with both key and value
- recordEntry := h.createRecordEntry(record.Key, record.Value, int32(i), currentTimestamp)
- recordsData = append(recordsData, recordEntry...)
- }
-
- // Apply compression if the data is large enough to benefit
- enableCompression := len(recordsData) > 100
- var compressionType compression.CompressionCodec = compression.None
- var finalRecordsData []byte
-
- if enableCompression {
- compressed, err := compression.Compress(compression.Gzip, recordsData)
- if err == nil && len(compressed) < len(recordsData) {
- finalRecordsData = compressed
- compressionType = compression.Gzip
- } else {
- finalRecordsData = recordsData
- }
- } else {
- finalRecordsData = recordsData
- }
-
- // Create the record batch with proper compression and CRC
- batch, err := h.createRecordBatchWithCompressionAndCRC(baseOffset, finalRecordsData, compressionType, int32(len(records)), currentTimestamp)
- if err != nil {
- // Fallback to simple batch creation
- return h.createRecordBatchWithPayload(baseOffset, int32(len(records)), finalRecordsData)
- }
-
- return batch
-}
-
-// createRecordEntry creates a single record entry in Kafka record format v2
-func (h *Handler) createRecordEntry(messageKey []byte, messageData []byte, offsetDelta int32, timestamp int64) []byte {
- // Record format v2:
- // - length (varint)
- // - attributes (int8)
- // - timestamp delta (varint)
- // - offset delta (varint)
- // - key length (varint) + key
- // - value length (varint) + value
- // - headers count (varint) + headers
-
- var record []byte
-
- // Attributes (1 byte) - no special attributes
- record = append(record, 0)
-
- // Timestamp delta (varint) - 0 for now (all messages have same timestamp)
- record = append(record, encodeVarint(0)...)
-
- // Offset delta (varint)
- record = append(record, encodeVarint(int64(offsetDelta))...)
-
- // Key length (varint) + key
- if messageKey == nil || len(messageKey) == 0 {
- record = append(record, encodeVarint(-1)...) // -1 indicates null key
- } else {
- record = append(record, encodeVarint(int64(len(messageKey)))...)
- record = append(record, messageKey...)
- }
-
- // Value length (varint) + value
- record = append(record, encodeVarint(int64(len(messageData)))...)
- record = append(record, messageData...)
-
- // Headers count (varint) - no headers
- record = append(record, encodeVarint(0)...)
-
- // Prepend the total record length (varint)
- recordLength := encodeVarint(int64(len(record)))
- return append(recordLength, record...)
-}
-
-// createRecordBatchWithCompressionAndCRC creates a Kafka record batch with proper compression and CRC
-func (h *Handler) createRecordBatchWithCompressionAndCRC(baseOffset int64, recordsData []byte, compressionType compression.CompressionCodec, recordCount int32, baseTimestampMs int64) ([]byte, error) {
- // Create record batch header
- // Validate size to prevent overflow
- const maxBatchSize = 1 << 30 // 1 GB limit
- if len(recordsData) > maxBatchSize-61 {
- return nil, fmt.Errorf("records data too large: %d bytes", len(recordsData))
- }
- batch := make([]byte, 0, len(recordsData)+61) // 61 bytes for header
-
- // Base offset (8 bytes)
- baseOffsetBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
- batch = append(batch, baseOffsetBytes...)
-
- // Batch length placeholder (4 bytes) - will be filled later
- batchLengthPos := len(batch)
- batch = append(batch, 0, 0, 0, 0)
-
- // Partition leader epoch (4 bytes)
- batch = append(batch, 0, 0, 0, 0)
-
- // Magic byte (1 byte) - version 2
- batch = append(batch, 2)
-
- // CRC placeholder (4 bytes) - will be calculated later
- crcPos := len(batch)
- batch = append(batch, 0, 0, 0, 0)
-
- // Attributes (2 bytes) - compression type and other flags
- attributes := int16(compressionType) // Set compression type in lower 3 bits
- attributesBytes := make([]byte, 2)
- binary.BigEndian.PutUint16(attributesBytes, uint16(attributes))
- batch = append(batch, attributesBytes...)
-
- // Last offset delta (4 bytes)
- lastOffsetDelta := uint32(recordCount - 1)
- lastOffsetDeltaBytes := make([]byte, 4)
- binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta)
- batch = append(batch, lastOffsetDeltaBytes...)
-
- // First timestamp (8 bytes) - use the same timestamp used to build record entries
- firstTimestampBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(firstTimestampBytes, uint64(baseTimestampMs))
- batch = append(batch, firstTimestampBytes...)
-
- // Max timestamp (8 bytes) - same as first for simplicity
- batch = append(batch, firstTimestampBytes...)
-
- // Producer ID (8 bytes) - -1 for non-transactional
- batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF)
-
- // Producer epoch (2 bytes) - -1 for non-transactional
- batch = append(batch, 0xFF, 0xFF)
-
- // Base sequence (4 bytes) - -1 for non-transactional
- batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF)
-
- // Record count (4 bytes)
- recordCountBytes := make([]byte, 4)
- binary.BigEndian.PutUint32(recordCountBytes, uint32(recordCount))
- batch = append(batch, recordCountBytes...)
-
- // Records payload (compressed or uncompressed)
- batch = append(batch, recordsData...)
-
- // Calculate and set batch length (excluding base offset and batch length fields)
- batchLength := len(batch) - 12 // 8 bytes base offset + 4 bytes batch length
- binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], uint32(batchLength))
-
- // Calculate and set CRC32 over attributes..end (exclude CRC field itself)
- // Kafka uses Castagnoli (CRC-32C) algorithm. CRC covers ONLY from attributes offset (byte 21) onwards.
- // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...)
- crcData := batch[crcPos+4:] // Skip CRC field itself (bytes 17..20) and include the rest
- crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli))
- binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc)
-
- return batch, nil
-}
-
// createEmptyRecordBatch creates an empty Kafka record batch using the new parser
func (h *Handler) createEmptyRecordBatch(baseOffset int64) []byte {
// Use the new record batch creation function with no compression
@@ -1297,47 +952,6 @@ func (h *Handler) createEmptyRecordBatchManual(baseOffset int64) []byte {
return batch
}
-// createRecordBatchWithPayload creates a record batch with the given payload
-func (h *Handler) createRecordBatchWithPayload(baseOffset int64, recordCount int32, payload []byte) []byte {
- // For Phase 7, create a simplified record batch
- // In Phase 8, this will implement proper Kafka record batch format v2
-
- batch := h.createEmptyRecordBatch(baseOffset)
-
- // Update record count
- recordCountOffset := len(batch) - 4
- binary.BigEndian.PutUint32(batch[recordCountOffset:recordCountOffset+4], uint32(recordCount))
-
- // Append payload (simplified - real implementation would format individual records)
- batch = append(batch, payload...)
-
- // Update batch length
- batchLength := len(batch) - 12
- binary.BigEndian.PutUint32(batch[8:12], uint32(batchLength))
-
- return batch
-}
-
-// handleSchematizedFetch handles fetch requests for topics with schematized messages
-func (h *Handler) handleSchematizedFetch(topicName string, partitionID int32, offset int64, maxBytes int32) ([]byte, error) {
- // Check if this topic uses schema management
- if !h.IsSchemaEnabled() {
- // Fall back to regular fetch handling
- return nil, fmt.Errorf("schema management not enabled")
- }
-
- // Fetch schematized records from SeaweedMQ
- records, err := h.fetchSchematizedRecords(topicName, partitionID, offset, maxBytes)
- if err != nil {
- return nil, fmt.Errorf("failed to fetch schematized records: %w", err)
- }
-
- // Create record batch from reconstructed records
- recordBatch := h.createSchematizedRecordBatch(records, offset)
-
- return recordBatch, nil
-}
-
// isSchematizedTopic checks if a topic uses schema management
func (h *Handler) isSchematizedTopic(topicName string) bool {
// System topics (_schemas, __consumer_offsets, etc.) should NEVER use schema encoding
@@ -1518,13 +1132,21 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return nil
}
- // CRITICAL FIX: For system topics like _schemas, _consumer_offsets, etc.,
+
+ // For system topics like _schemas, _consumer_offsets, etc.,
// return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.)
// and should NOT be processed as RecordValue protobuf messages.
if strings.HasPrefix(topicName, "_") {
return recordValueBytes
}
+ // CRITICAL: If schema management is not enabled, we should NEVER try to parse as RecordValue
+ // All messages are stored as raw bytes when schema management is disabled
+ // Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing
+ if !h.IsSchemaEnabled() {
+ return recordValueBytes
+ }
+
// Try to unmarshal as RecordValue
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(recordValueBytes, recordValue); err != nil {
@@ -1533,6 +1155,14 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return recordValueBytes
}
+ // Validate that the unmarshaled RecordValue is actually a valid RecordValue
+ // Protobuf unmarshal is lenient and can succeed with garbage data for random bytes
+ // We need to check if this looks like a real RecordValue or just random bytes
+ if !h.isValidRecordValue(recordValue, recordValueBytes) {
+ // Not a valid RecordValue - return raw bytes as-is
+ return recordValueBytes
+ }
+
// If schema management is enabled, re-encode the RecordValue to Confluent format
if h.IsSchemaEnabled() {
if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil {
@@ -1545,6 +1175,60 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB
return h.recordValueToJSON(recordValue)
}
+// isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes
+// This performs a roundtrip test: marshal the RecordValue and check if it produces similar output
+func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue, originalBytes []byte) bool {
+ // Empty or nil Fields means not a valid RecordValue
+ if recordValue == nil || recordValue.Fields == nil || len(recordValue.Fields) == 0 {
+ return false
+ }
+
+ // Check if field names are valid UTF-8 strings (not binary garbage)
+ // Real RecordValue messages have proper field names like "name", "age", etc.
+ // Random bytes parsed as protobuf often create non-UTF8 or very short field names
+ for fieldName, fieldValue := range recordValue.Fields {
+ // Field name should be valid UTF-8
+ if !utf8.ValidString(fieldName) {
+ return false
+ }
+
+ // Field name should have reasonable length (at least 1 char, at most 1000)
+ if len(fieldName) == 0 || len(fieldName) > 1000 {
+ return false
+ }
+
+ // Field value should not be nil
+ if fieldValue == nil || fieldValue.Kind == nil {
+ return false
+ }
+ }
+
+ // Roundtrip check: If this is a real RecordValue, marshaling it back should produce
+ // similar-sized output. Random bytes that accidentally parse as protobuf will typically
+ // produce very different output when marshaled back.
+ remarshaled, err := proto.Marshal(recordValue)
+ if err != nil {
+ return false
+ }
+
+ // Check if the sizes are reasonably similar (within 50% tolerance)
+ // Real RecordValue will have similar size, random bytes will be very different
+ originalSize := len(originalBytes)
+ remarshaledSize := len(remarshaled)
+ if originalSize == 0 {
+ return false
+ }
+
+ // Calculate size ratio - should be close to 1.0 for real RecordValue
+ ratio := float64(remarshaledSize) / float64(originalSize)
+ if ratio < 0.5 || ratio > 2.0 {
+ // Size differs too much - this is likely random bytes parsed as protobuf
+ return false
+ }
+
+ return true
+}
+
// encodeRecordValueToConfluentFormat re-encodes a RecordValue back to Confluent format
func (h *Handler) encodeRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) {
if recordValue == nil {
@@ -1583,62 +1267,6 @@ func (h *Handler) getTopicSchemaConfig(topicName string) (*TopicSchemaConfig, er
return config, nil
}
-// decodeRecordValueToKafkaKey decodes a key RecordValue back to the original Kafka key bytes
-func (h *Handler) decodeRecordValueToKafkaKey(topicName string, keyRecordValueBytes []byte) []byte {
- if keyRecordValueBytes == nil {
- return nil
- }
-
- // Try to get topic schema config
- schemaConfig, err := h.getTopicSchemaConfig(topicName)
- if err != nil || !schemaConfig.HasKeySchema {
- // No key schema config available, return raw bytes
- return keyRecordValueBytes
- }
-
- // Try to unmarshal as RecordValue
- recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(keyRecordValueBytes, recordValue); err != nil {
- // If it's not a RecordValue, return the raw bytes
- return keyRecordValueBytes
- }
-
- // If key schema management is enabled, re-encode the RecordValue to Confluent format
- if h.IsSchemaEnabled() {
- if encodedKey, err := h.encodeKeyRecordValueToConfluentFormat(topicName, recordValue); err == nil {
- return encodedKey
- }
- }
-
- // Fallback: convert RecordValue to JSON
- return h.recordValueToJSON(recordValue)
-}
-
-// encodeKeyRecordValueToConfluentFormat re-encodes a key RecordValue back to Confluent format
-func (h *Handler) encodeKeyRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) {
- if recordValue == nil {
- return nil, fmt.Errorf("key RecordValue is nil")
- }
-
- // Get schema configuration from topic config
- schemaConfig, err := h.getTopicSchemaConfig(topicName)
- if err != nil {
- return nil, fmt.Errorf("failed to get topic schema config: %w", err)
- }
-
- if !schemaConfig.HasKeySchema {
- return nil, fmt.Errorf("no key schema configured for topic: %s", topicName)
- }
-
- // Use schema manager to encode RecordValue back to original format
- encodedBytes, err := h.schemaManager.EncodeMessage(recordValue, schemaConfig.KeySchemaID, schemaConfig.KeySchemaFormat)
- if err != nil {
- return nil, fmt.Errorf("failed to encode key RecordValue: %w", err)
- }
-
- return encodedBytes, nil
-}
-
// recordValueToJSON converts a RecordValue to JSON bytes (fallback)
func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte {
if recordValue == nil || recordValue.Fields == nil {
@@ -1675,92 +1303,3 @@ func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte {
return []byte(jsonStr)
}
-
-// fetchPartitionData fetches data for a single partition (called concurrently)
-func (h *Handler) fetchPartitionData(
- ctx context.Context,
- topicName string,
- partition FetchPartition,
- apiVersion uint16,
- isSchematizedTopic bool,
-) *partitionFetchResult {
- startTime := time.Now()
- result := &partitionFetchResult{}
-
- // Get the actual high water mark from SeaweedMQ
- highWaterMark, err := h.seaweedMQHandler.GetLatestOffset(topicName, partition.PartitionID)
- if err != nil {
- highWaterMark = 0
- }
- result.highWaterMark = highWaterMark
-
- // Check if topic exists
- if !h.seaweedMQHandler.TopicExists(topicName) {
- if isSystemTopic(topicName) {
- // Auto-create system topics
- if err := h.createTopicWithSchemaSupport(topicName, 1); err != nil {
- result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
- result.fetchDuration = time.Since(startTime)
- return result
- }
- } else {
- result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
- result.fetchDuration = time.Since(startTime)
- return result
- }
- }
-
- // Normalize special fetch offsets
- effectiveFetchOffset := partition.FetchOffset
- if effectiveFetchOffset < 0 {
- if effectiveFetchOffset == -2 {
- effectiveFetchOffset = 0
- } else if effectiveFetchOffset == -1 {
- effectiveFetchOffset = highWaterMark
- }
- }
-
- // Fetch records if available
- var recordBatch []byte
- if highWaterMark > effectiveFetchOffset {
- // Use multi-batch fetcher (pass context to respect timeout)
- multiFetcher := NewMultiBatchFetcher(h)
- fetchResult, err := multiFetcher.FetchMultipleBatches(
- ctx,
- topicName,
- partition.PartitionID,
- effectiveFetchOffset,
- highWaterMark,
- partition.MaxBytes,
- )
-
- if err == nil && fetchResult.TotalSize > 0 {
- recordBatch = fetchResult.RecordBatches
- } else {
- // Fallback to single batch (pass context to respect timeout)
- smqRecords, err := h.seaweedMQHandler.GetStoredRecords(ctx, topicName, partition.PartitionID, effectiveFetchOffset, 10)
- if err == nil && len(smqRecords) > 0 {
- recordBatch = h.constructRecordBatchFromSMQ(topicName, effectiveFetchOffset, smqRecords)
- } else {
- recordBatch = []byte{}
- }
- }
- } else {
- recordBatch = []byte{}
- }
-
- // Try schematized records if needed and recordBatch is empty
- if isSchematizedTopic && len(recordBatch) == 0 {
- schematizedRecords, err := h.fetchSchematizedRecords(topicName, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes)
- if err == nil && len(schematizedRecords) > 0 {
- schematizedBatch := h.createSchematizedRecordBatch(schematizedRecords, effectiveFetchOffset)
- if len(schematizedBatch) > 0 {
- recordBatch = schematizedBatch
- }
- }
- }
-
- result.recordBatch = recordBatch
- result.fetchDuration = time.Since(startTime)
- return result
-}