diff options
Diffstat (limited to 'weed/mq/kafka/protocol/fetch.go')
| -rw-r--r-- | weed/mq/kafka/protocol/fetch.go | 651 |
1 files changed, 95 insertions, 556 deletions
diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index edc07d57a..6b38a71e1 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -7,6 +7,7 @@ import ( "hash/crc32" "strings" "time" + "unicode/utf8" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" @@ -97,11 +98,16 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers // Continue with polling } if hasDataAvailable() { + // Data became available during polling - return immediately with NO throttle + // Throttle time should only be used for quota enforcement, not for long-poll timing + throttleTimeMs = 0 break pollLoop } } - elapsed := time.Since(start) - throttleTimeMs = int32(elapsed / time.Millisecond) + // If we got here without breaking early, we hit the timeout + // Long-poll timeout is NOT throttling - throttle time should only be used for quota/rate limiting + // Do NOT set throttle time based on long-poll duration + throttleTimeMs = 0 } // Build the response @@ -155,7 +161,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers return nil, fmt.Errorf("connection context not available") } - glog.V(2).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions", + glog.V(4).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions", connContext.ConnectionID, correlationID, len(fetchRequest.Topics), func() int { count := 0 @@ -166,7 +172,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers }()) // Collect results from persistent readers - // CRITICAL: Dispatch all requests concurrently, then wait for all results in parallel + // Dispatch all requests concurrently, then wait for all results in parallel // to avoid sequential timeout accumulation type pendingFetch struct { topicName string @@ -242,9 +248,19 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers } // Phase 2: Wait for all results with adequate timeout for CI environments - // CRITICAL: We MUST return a result for every requested partition or Sarama will error + // We MUST return a result for every requested partition or Sarama will error results := make([]*partitionFetchResult, len(pending)) - deadline := time.After(500 * time.Millisecond) // 500ms for all partitions (increased for CI disk I/O) + // Use 95% of client's MaxWaitTime to ensure we return BEFORE client timeout + // This maximizes data collection time while leaving a safety buffer for: + // - Response serialization, network transmission, client processing + // For 500ms client timeout: 475ms internal fetch, 25ms buffer + // For 100ms client timeout: 95ms internal fetch, 5ms buffer + effectiveDeadlineMs := time.Duration(maxWaitMs) * 95 / 100 + deadline := time.After(effectiveDeadlineMs * time.Millisecond) + if maxWaitMs < 20 { + // For very short timeouts (< 20ms), use full timeout to maximize data collection + deadline = time.After(time.Duration(maxWaitMs) * time.Millisecond) + } // Collect results one by one with shared deadline for i, pf := range pending { @@ -256,7 +272,7 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers for j := i; j < len(pending); j++ { results[j] = &partitionFetchResult{} } - glog.V(1).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions", + glog.V(3).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions", connContext.ConnectionID, len(pending)-i) goto done case <-ctx.Done(): @@ -276,7 +292,7 @@ done: // Now assemble the response in the correct order using fetched results // ==================================================================== - // CRITICAL: Verify we have results for all requested partitions + // Verify we have results for all requested partitions // Sarama requires a response block for EVERY requested partition to avoid ErrIncompleteResponse expectedResultCount := 0 for _, topic := range fetchRequest.Topics { @@ -861,373 +877,12 @@ func encodeVarint(value int64) []byte { return buf } -// reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue -func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) { - // Only reconstruct if schema management is enabled - if !h.IsSchemaEnabled() { - return nil, fmt.Errorf("schema management not enabled") - } - - // Extract schema information from metadata - schemaIDStr, exists := metadata["schema_id"] - if !exists { - return nil, fmt.Errorf("no schema ID in metadata") - } - - var schemaID uint32 - if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil { - return nil, fmt.Errorf("invalid schema ID: %w", err) - } - - formatStr, exists := metadata["schema_format"] - if !exists { - return nil, fmt.Errorf("no schema format in metadata") - } - - var format schema.Format - switch formatStr { - case "AVRO": - format = schema.FormatAvro - case "PROTOBUF": - format = schema.FormatProtobuf - case "JSON_SCHEMA": - format = schema.FormatJSONSchema - default: - return nil, fmt.Errorf("unsupported schema format: %s", formatStr) - } - - // Use schema manager to encode back to original format - return h.schemaManager.EncodeMessage(recordValue, schemaID, format) -} - // SchematizedRecord holds both key and value for schematized messages type SchematizedRecord struct { Key []byte Value []byte } -// fetchSchematizedRecords fetches and reconstructs schematized records from SeaweedMQ -func (h *Handler) fetchSchematizedRecords(topicName string, partitionID int32, offset int64, maxBytes int32) ([]*SchematizedRecord, error) { - glog.Infof("fetchSchematizedRecords: topic=%s partition=%d offset=%d maxBytes=%d", topicName, partitionID, offset, maxBytes) - - // Only proceed when schema feature is toggled on - if !h.useSchema { - glog.Infof("fetchSchematizedRecords EARLY RETURN: useSchema=false") - return []*SchematizedRecord{}, nil - } - - // Check if SeaweedMQ handler is available when schema feature is in use - if h.seaweedMQHandler == nil { - glog.Infof("fetchSchematizedRecords ERROR: seaweedMQHandler is nil") - return nil, fmt.Errorf("SeaweedMQ handler not available") - } - - // If schema management isn't fully configured, return empty instead of error - if !h.IsSchemaEnabled() { - glog.Infof("fetchSchematizedRecords EARLY RETURN: IsSchemaEnabled()=false") - return []*SchematizedRecord{}, nil - } - - // Fetch stored records from SeaweedMQ - maxRecords := 100 // Reasonable batch size limit - glog.Infof("fetchSchematizedRecords: calling GetStoredRecords maxRecords=%d", maxRecords) - smqRecords, err := h.seaweedMQHandler.GetStoredRecords(context.Background(), topicName, partitionID, offset, maxRecords) - if err != nil { - glog.Infof("fetchSchematizedRecords ERROR: GetStoredRecords failed: %v", err) - return nil, fmt.Errorf("failed to fetch SMQ records: %w", err) - } - - glog.Infof("fetchSchematizedRecords: GetStoredRecords returned %d records", len(smqRecords)) - if len(smqRecords) == 0 { - return []*SchematizedRecord{}, nil - } - - var reconstructedRecords []*SchematizedRecord - totalBytes := int32(0) - - for _, smqRecord := range smqRecords { - // Check if we've exceeded maxBytes limit - if maxBytes > 0 && totalBytes >= maxBytes { - break - } - - // Try to reconstruct the schematized message value - reconstructedValue, err := h.reconstructSchematizedMessageFromSMQ(smqRecord) - if err != nil { - // Log error but continue with other messages - Error("Failed to reconstruct schematized message at offset %d: %v", smqRecord.GetOffset(), err) - continue - } - - if reconstructedValue != nil { - // Create SchematizedRecord with both key and reconstructed value - record := &SchematizedRecord{ - Key: smqRecord.GetKey(), // Preserve the original key - Value: reconstructedValue, // Use the reconstructed value - } - reconstructedRecords = append(reconstructedRecords, record) - totalBytes += int32(len(record.Key) + len(record.Value)) - } - } - - return reconstructedRecords, nil -} - -// reconstructSchematizedMessageFromSMQ reconstructs a schematized message from an SMQRecord -func (h *Handler) reconstructSchematizedMessageFromSMQ(smqRecord integration.SMQRecord) ([]byte, error) { - // Get the stored value (should be a serialized RecordValue) - valueBytes := smqRecord.GetValue() - if len(valueBytes) == 0 { - return nil, fmt.Errorf("empty value in SMQ record") - } - - // Try to unmarshal as RecordValue - recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(valueBytes, recordValue); err != nil { - // If it's not a RecordValue, it might be a regular Kafka message - // Return it as-is (non-schematized) - return valueBytes, nil - } - - // Extract schema metadata from the RecordValue fields - metadata := h.extractSchemaMetadataFromRecord(recordValue) - if len(metadata) == 0 { - // No schema metadata found, treat as regular message - return valueBytes, nil - } - - // Remove Kafka metadata fields to get the original message content - originalRecord := h.removeKafkaMetadataFields(recordValue) - - // Reconstruct the original Confluent envelope - return h.reconstructSchematizedMessage(originalRecord, metadata) -} - -// extractSchemaMetadataFromRecord extracts schema metadata from RecordValue fields -func (h *Handler) extractSchemaMetadataFromRecord(recordValue *schema_pb.RecordValue) map[string]string { - metadata := make(map[string]string) - - // Look for schema metadata fields in the record - if schemaIDField := recordValue.Fields["_schema_id"]; schemaIDField != nil { - if schemaIDValue := schemaIDField.GetStringValue(); schemaIDValue != "" { - metadata["schema_id"] = schemaIDValue - } - } - - if schemaFormatField := recordValue.Fields["_schema_format"]; schemaFormatField != nil { - if schemaFormatValue := schemaFormatField.GetStringValue(); schemaFormatValue != "" { - metadata["schema_format"] = schemaFormatValue - } - } - - if schemaSubjectField := recordValue.Fields["_schema_subject"]; schemaSubjectField != nil { - if schemaSubjectValue := schemaSubjectField.GetStringValue(); schemaSubjectValue != "" { - metadata["schema_subject"] = schemaSubjectValue - } - } - - if schemaVersionField := recordValue.Fields["_schema_version"]; schemaVersionField != nil { - if schemaVersionValue := schemaVersionField.GetStringValue(); schemaVersionValue != "" { - metadata["schema_version"] = schemaVersionValue - } - } - - return metadata -} - -// removeKafkaMetadataFields removes Kafka and schema metadata fields from RecordValue -func (h *Handler) removeKafkaMetadataFields(recordValue *schema_pb.RecordValue) *schema_pb.RecordValue { - originalRecord := &schema_pb.RecordValue{ - Fields: make(map[string]*schema_pb.Value), - } - - // Copy all fields except metadata fields - for key, value := range recordValue.Fields { - if !h.isMetadataField(key) { - originalRecord.Fields[key] = value - } - } - - return originalRecord -} - -// isMetadataField checks if a field is a metadata field that should be excluded from the original message -func (h *Handler) isMetadataField(fieldName string) bool { - return fieldName == "_kafka_offset" || - fieldName == "_kafka_partition" || - fieldName == "_kafka_timestamp" || - fieldName == "_schema_id" || - fieldName == "_schema_format" || - fieldName == "_schema_subject" || - fieldName == "_schema_version" -} - -// createSchematizedRecordBatch creates a Kafka record batch from reconstructed schematized messages -func (h *Handler) createSchematizedRecordBatch(records []*SchematizedRecord, baseOffset int64) []byte { - if len(records) == 0 { - // Return empty record batch - return h.createEmptyRecordBatch(baseOffset) - } - - // Create individual record entries for the batch - var recordsData []byte - currentTimestamp := time.Now().UnixMilli() - - for i, record := range records { - // Create a record entry (Kafka record format v2) with both key and value - recordEntry := h.createRecordEntry(record.Key, record.Value, int32(i), currentTimestamp) - recordsData = append(recordsData, recordEntry...) - } - - // Apply compression if the data is large enough to benefit - enableCompression := len(recordsData) > 100 - var compressionType compression.CompressionCodec = compression.None - var finalRecordsData []byte - - if enableCompression { - compressed, err := compression.Compress(compression.Gzip, recordsData) - if err == nil && len(compressed) < len(recordsData) { - finalRecordsData = compressed - compressionType = compression.Gzip - } else { - finalRecordsData = recordsData - } - } else { - finalRecordsData = recordsData - } - - // Create the record batch with proper compression and CRC - batch, err := h.createRecordBatchWithCompressionAndCRC(baseOffset, finalRecordsData, compressionType, int32(len(records)), currentTimestamp) - if err != nil { - // Fallback to simple batch creation - return h.createRecordBatchWithPayload(baseOffset, int32(len(records)), finalRecordsData) - } - - return batch -} - -// createRecordEntry creates a single record entry in Kafka record format v2 -func (h *Handler) createRecordEntry(messageKey []byte, messageData []byte, offsetDelta int32, timestamp int64) []byte { - // Record format v2: - // - length (varint) - // - attributes (int8) - // - timestamp delta (varint) - // - offset delta (varint) - // - key length (varint) + key - // - value length (varint) + value - // - headers count (varint) + headers - - var record []byte - - // Attributes (1 byte) - no special attributes - record = append(record, 0) - - // Timestamp delta (varint) - 0 for now (all messages have same timestamp) - record = append(record, encodeVarint(0)...) - - // Offset delta (varint) - record = append(record, encodeVarint(int64(offsetDelta))...) - - // Key length (varint) + key - if messageKey == nil || len(messageKey) == 0 { - record = append(record, encodeVarint(-1)...) // -1 indicates null key - } else { - record = append(record, encodeVarint(int64(len(messageKey)))...) - record = append(record, messageKey...) - } - - // Value length (varint) + value - record = append(record, encodeVarint(int64(len(messageData)))...) - record = append(record, messageData...) - - // Headers count (varint) - no headers - record = append(record, encodeVarint(0)...) - - // Prepend the total record length (varint) - recordLength := encodeVarint(int64(len(record))) - return append(recordLength, record...) -} - -// createRecordBatchWithCompressionAndCRC creates a Kafka record batch with proper compression and CRC -func (h *Handler) createRecordBatchWithCompressionAndCRC(baseOffset int64, recordsData []byte, compressionType compression.CompressionCodec, recordCount int32, baseTimestampMs int64) ([]byte, error) { - // Create record batch header - // Validate size to prevent overflow - const maxBatchSize = 1 << 30 // 1 GB limit - if len(recordsData) > maxBatchSize-61 { - return nil, fmt.Errorf("records data too large: %d bytes", len(recordsData)) - } - batch := make([]byte, 0, len(recordsData)+61) // 61 bytes for header - - // Base offset (8 bytes) - baseOffsetBytes := make([]byte, 8) - binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset)) - batch = append(batch, baseOffsetBytes...) - - // Batch length placeholder (4 bytes) - will be filled later - batchLengthPos := len(batch) - batch = append(batch, 0, 0, 0, 0) - - // Partition leader epoch (4 bytes) - batch = append(batch, 0, 0, 0, 0) - - // Magic byte (1 byte) - version 2 - batch = append(batch, 2) - - // CRC placeholder (4 bytes) - will be calculated later - crcPos := len(batch) - batch = append(batch, 0, 0, 0, 0) - - // Attributes (2 bytes) - compression type and other flags - attributes := int16(compressionType) // Set compression type in lower 3 bits - attributesBytes := make([]byte, 2) - binary.BigEndian.PutUint16(attributesBytes, uint16(attributes)) - batch = append(batch, attributesBytes...) - - // Last offset delta (4 bytes) - lastOffsetDelta := uint32(recordCount - 1) - lastOffsetDeltaBytes := make([]byte, 4) - binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) - batch = append(batch, lastOffsetDeltaBytes...) - - // First timestamp (8 bytes) - use the same timestamp used to build record entries - firstTimestampBytes := make([]byte, 8) - binary.BigEndian.PutUint64(firstTimestampBytes, uint64(baseTimestampMs)) - batch = append(batch, firstTimestampBytes...) - - // Max timestamp (8 bytes) - same as first for simplicity - batch = append(batch, firstTimestampBytes...) - - // Producer ID (8 bytes) - -1 for non-transactional - batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) - - // Producer epoch (2 bytes) - -1 for non-transactional - batch = append(batch, 0xFF, 0xFF) - - // Base sequence (4 bytes) - -1 for non-transactional - batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) - - // Record count (4 bytes) - recordCountBytes := make([]byte, 4) - binary.BigEndian.PutUint32(recordCountBytes, uint32(recordCount)) - batch = append(batch, recordCountBytes...) - - // Records payload (compressed or uncompressed) - batch = append(batch, recordsData...) - - // Calculate and set batch length (excluding base offset and batch length fields) - batchLength := len(batch) - 12 // 8 bytes base offset + 4 bytes batch length - binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], uint32(batchLength)) - - // Calculate and set CRC32 over attributes..end (exclude CRC field itself) - // Kafka uses Castagnoli (CRC-32C) algorithm. CRC covers ONLY from attributes offset (byte 21) onwards. - // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...) - crcData := batch[crcPos+4:] // Skip CRC field itself (bytes 17..20) and include the rest - crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli)) - binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc) - - return batch, nil -} - // createEmptyRecordBatch creates an empty Kafka record batch using the new parser func (h *Handler) createEmptyRecordBatch(baseOffset int64) []byte { // Use the new record batch creation function with no compression @@ -1297,47 +952,6 @@ func (h *Handler) createEmptyRecordBatchManual(baseOffset int64) []byte { return batch } -// createRecordBatchWithPayload creates a record batch with the given payload -func (h *Handler) createRecordBatchWithPayload(baseOffset int64, recordCount int32, payload []byte) []byte { - // For Phase 7, create a simplified record batch - // In Phase 8, this will implement proper Kafka record batch format v2 - - batch := h.createEmptyRecordBatch(baseOffset) - - // Update record count - recordCountOffset := len(batch) - 4 - binary.BigEndian.PutUint32(batch[recordCountOffset:recordCountOffset+4], uint32(recordCount)) - - // Append payload (simplified - real implementation would format individual records) - batch = append(batch, payload...) - - // Update batch length - batchLength := len(batch) - 12 - binary.BigEndian.PutUint32(batch[8:12], uint32(batchLength)) - - return batch -} - -// handleSchematizedFetch handles fetch requests for topics with schematized messages -func (h *Handler) handleSchematizedFetch(topicName string, partitionID int32, offset int64, maxBytes int32) ([]byte, error) { - // Check if this topic uses schema management - if !h.IsSchemaEnabled() { - // Fall back to regular fetch handling - return nil, fmt.Errorf("schema management not enabled") - } - - // Fetch schematized records from SeaweedMQ - records, err := h.fetchSchematizedRecords(topicName, partitionID, offset, maxBytes) - if err != nil { - return nil, fmt.Errorf("failed to fetch schematized records: %w", err) - } - - // Create record batch from reconstructed records - recordBatch := h.createSchematizedRecordBatch(records, offset) - - return recordBatch, nil -} - // isSchematizedTopic checks if a topic uses schema management func (h *Handler) isSchematizedTopic(topicName string) bool { // System topics (_schemas, __consumer_offsets, etc.) should NEVER use schema encoding @@ -1518,13 +1132,21 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return nil } - // CRITICAL FIX: For system topics like _schemas, _consumer_offsets, etc., + + // For system topics like _schemas, _consumer_offsets, etc., // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) // and should NOT be processed as RecordValue protobuf messages. if strings.HasPrefix(topicName, "_") { return recordValueBytes } + // CRITICAL: If schema management is not enabled, we should NEVER try to parse as RecordValue + // All messages are stored as raw bytes when schema management is disabled + // Attempting to parse them as RecordValue will cause corruption due to protobuf's lenient parsing + if !h.IsSchemaEnabled() { + return recordValueBytes + } + // Try to unmarshal as RecordValue recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(recordValueBytes, recordValue); err != nil { @@ -1533,6 +1155,14 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return recordValueBytes } + // Validate that the unmarshaled RecordValue is actually a valid RecordValue + // Protobuf unmarshal is lenient and can succeed with garbage data for random bytes + // We need to check if this looks like a real RecordValue or just random bytes + if !h.isValidRecordValue(recordValue, recordValueBytes) { + // Not a valid RecordValue - return raw bytes as-is + return recordValueBytes + } + // If schema management is enabled, re-encode the RecordValue to Confluent format if h.IsSchemaEnabled() { if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil { @@ -1545,6 +1175,60 @@ func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueB return h.recordValueToJSON(recordValue) } +// isValidRecordValue checks if a RecordValue looks like a real RecordValue or garbage from random bytes +// This performs a roundtrip test: marshal the RecordValue and check if it produces similar output +func (h *Handler) isValidRecordValue(recordValue *schema_pb.RecordValue, originalBytes []byte) bool { + // Empty or nil Fields means not a valid RecordValue + if recordValue == nil || recordValue.Fields == nil || len(recordValue.Fields) == 0 { + return false + } + + // Check if field names are valid UTF-8 strings (not binary garbage) + // Real RecordValue messages have proper field names like "name", "age", etc. + // Random bytes parsed as protobuf often create non-UTF8 or very short field names + for fieldName, fieldValue := range recordValue.Fields { + // Field name should be valid UTF-8 + if !utf8.ValidString(fieldName) { + return false + } + + // Field name should have reasonable length (at least 1 char, at most 1000) + if len(fieldName) == 0 || len(fieldName) > 1000 { + return false + } + + // Field value should not be nil + if fieldValue == nil || fieldValue.Kind == nil { + return false + } + } + + // Roundtrip check: If this is a real RecordValue, marshaling it back should produce + // similar-sized output. Random bytes that accidentally parse as protobuf will typically + // produce very different output when marshaled back. + remarshaled, err := proto.Marshal(recordValue) + if err != nil { + return false + } + + // Check if the sizes are reasonably similar (within 50% tolerance) + // Real RecordValue will have similar size, random bytes will be very different + originalSize := len(originalBytes) + remarshaledSize := len(remarshaled) + if originalSize == 0 { + return false + } + + // Calculate size ratio - should be close to 1.0 for real RecordValue + ratio := float64(remarshaledSize) / float64(originalSize) + if ratio < 0.5 || ratio > 2.0 { + // Size differs too much - this is likely random bytes parsed as protobuf + return false + } + + return true +} + // encodeRecordValueToConfluentFormat re-encodes a RecordValue back to Confluent format func (h *Handler) encodeRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) { if recordValue == nil { @@ -1583,62 +1267,6 @@ func (h *Handler) getTopicSchemaConfig(topicName string) (*TopicSchemaConfig, er return config, nil } -// decodeRecordValueToKafkaKey decodes a key RecordValue back to the original Kafka key bytes -func (h *Handler) decodeRecordValueToKafkaKey(topicName string, keyRecordValueBytes []byte) []byte { - if keyRecordValueBytes == nil { - return nil - } - - // Try to get topic schema config - schemaConfig, err := h.getTopicSchemaConfig(topicName) - if err != nil || !schemaConfig.HasKeySchema { - // No key schema config available, return raw bytes - return keyRecordValueBytes - } - - // Try to unmarshal as RecordValue - recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(keyRecordValueBytes, recordValue); err != nil { - // If it's not a RecordValue, return the raw bytes - return keyRecordValueBytes - } - - // If key schema management is enabled, re-encode the RecordValue to Confluent format - if h.IsSchemaEnabled() { - if encodedKey, err := h.encodeKeyRecordValueToConfluentFormat(topicName, recordValue); err == nil { - return encodedKey - } - } - - // Fallback: convert RecordValue to JSON - return h.recordValueToJSON(recordValue) -} - -// encodeKeyRecordValueToConfluentFormat re-encodes a key RecordValue back to Confluent format -func (h *Handler) encodeKeyRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) { - if recordValue == nil { - return nil, fmt.Errorf("key RecordValue is nil") - } - - // Get schema configuration from topic config - schemaConfig, err := h.getTopicSchemaConfig(topicName) - if err != nil { - return nil, fmt.Errorf("failed to get topic schema config: %w", err) - } - - if !schemaConfig.HasKeySchema { - return nil, fmt.Errorf("no key schema configured for topic: %s", topicName) - } - - // Use schema manager to encode RecordValue back to original format - encodedBytes, err := h.schemaManager.EncodeMessage(recordValue, schemaConfig.KeySchemaID, schemaConfig.KeySchemaFormat) - if err != nil { - return nil, fmt.Errorf("failed to encode key RecordValue: %w", err) - } - - return encodedBytes, nil -} - // recordValueToJSON converts a RecordValue to JSON bytes (fallback) func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte { if recordValue == nil || recordValue.Fields == nil { @@ -1675,92 +1303,3 @@ func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte { return []byte(jsonStr) } - -// fetchPartitionData fetches data for a single partition (called concurrently) -func (h *Handler) fetchPartitionData( - ctx context.Context, - topicName string, - partition FetchPartition, - apiVersion uint16, - isSchematizedTopic bool, -) *partitionFetchResult { - startTime := time.Now() - result := &partitionFetchResult{} - - // Get the actual high water mark from SeaweedMQ - highWaterMark, err := h.seaweedMQHandler.GetLatestOffset(topicName, partition.PartitionID) - if err != nil { - highWaterMark = 0 - } - result.highWaterMark = highWaterMark - - // Check if topic exists - if !h.seaweedMQHandler.TopicExists(topicName) { - if isSystemTopic(topicName) { - // Auto-create system topics - if err := h.createTopicWithSchemaSupport(topicName, 1); err != nil { - result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION - result.fetchDuration = time.Since(startTime) - return result - } - } else { - result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION - result.fetchDuration = time.Since(startTime) - return result - } - } - - // Normalize special fetch offsets - effectiveFetchOffset := partition.FetchOffset - if effectiveFetchOffset < 0 { - if effectiveFetchOffset == -2 { - effectiveFetchOffset = 0 - } else if effectiveFetchOffset == -1 { - effectiveFetchOffset = highWaterMark - } - } - - // Fetch records if available - var recordBatch []byte - if highWaterMark > effectiveFetchOffset { - // Use multi-batch fetcher (pass context to respect timeout) - multiFetcher := NewMultiBatchFetcher(h) - fetchResult, err := multiFetcher.FetchMultipleBatches( - ctx, - topicName, - partition.PartitionID, - effectiveFetchOffset, - highWaterMark, - partition.MaxBytes, - ) - - if err == nil && fetchResult.TotalSize > 0 { - recordBatch = fetchResult.RecordBatches - } else { - // Fallback to single batch (pass context to respect timeout) - smqRecords, err := h.seaweedMQHandler.GetStoredRecords(ctx, topicName, partition.PartitionID, effectiveFetchOffset, 10) - if err == nil && len(smqRecords) > 0 { - recordBatch = h.constructRecordBatchFromSMQ(topicName, effectiveFetchOffset, smqRecords) - } else { - recordBatch = []byte{} - } - } - } else { - recordBatch = []byte{} - } - - // Try schematized records if needed and recordBatch is empty - if isSchematizedTopic && len(recordBatch) == 0 { - schematizedRecords, err := h.fetchSchematizedRecords(topicName, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes) - if err == nil && len(schematizedRecords) > 0 { - schematizedBatch := h.createSchematizedRecordBatch(schematizedRecords, effectiveFetchOffset) - if len(schematizedBatch) > 0 { - recordBatch = schematizedBatch - } - } - } - - result.recordBatch = recordBatch - result.fetchDuration = time.Since(startTime) - return result -} |
