diff options
Diffstat (limited to 'weed/mq/kafka/protocol/fetch.go')
| -rw-r--r-- | weed/mq/kafka/protocol/fetch.go | 1766 |
1 files changed, 1766 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go new file mode 100644 index 000000000..edc07d57a --- /dev/null +++ b/weed/mq/kafka/protocol/fetch.go @@ -0,0 +1,1766 @@ +package protocol + +import ( + "context" + "encoding/binary" + "fmt" + "hash/crc32" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/protobuf/proto" +) + +// partitionFetchResult holds the result of fetching from a single partition +type partitionFetchResult struct { + topicIndex int + partitionIndex int + recordBatch []byte + highWaterMark int64 + errorCode int16 + fetchDuration time.Duration +} + +func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + // Parse the Fetch request to get the requested topics and partitions + fetchRequest, err := h.parseFetchRequest(apiVersion, requestBody) + if err != nil { + return nil, fmt.Errorf("parse fetch request: %w", err) + } + + // Basic long-polling to avoid client busy-looping when there's no data. + var throttleTimeMs int32 = 0 + // Only long-poll when all referenced topics exist; unknown topics should not block + allTopicsExist := func() bool { + for _, topic := range fetchRequest.Topics { + if !h.seaweedMQHandler.TopicExists(topic.Name) { + return false + } + } + return true + } + hasDataAvailable := func() bool { + // Check if any requested partition has data available + // Compare fetch offset with high water mark + for _, topic := range fetchRequest.Topics { + if !h.seaweedMQHandler.TopicExists(topic.Name) { + continue + } + for _, partition := range topic.Partitions { + hwm, err := h.seaweedMQHandler.GetLatestOffset(topic.Name, partition.PartitionID) + if err != nil { + continue + } + // Normalize fetch offset + effectiveOffset := partition.FetchOffset + if effectiveOffset == -2 { // earliest + effectiveOffset = 0 + } else if effectiveOffset == -1 { // latest + effectiveOffset = hwm + } + // If fetch offset < hwm, data is available + if effectiveOffset < hwm { + return true + } + } + } + return false + } + // Long-poll when client requests it via MaxWaitTime and there's no data + // Even if MinBytes=0, we should honor MaxWaitTime to reduce polling overhead + maxWaitMs := fetchRequest.MaxWaitTime + + // Long-poll if: (1) client wants to wait (maxWaitMs > 0), (2) no data available, (3) topics exist + // NOTE: We long-poll even if MinBytes=0, since the client specified a wait time + hasData := hasDataAvailable() + topicsExist := allTopicsExist() + shouldLongPoll := maxWaitMs > 0 && !hasData && topicsExist + + if shouldLongPoll { + start := time.Now() + // Use the client's requested wait time (already capped at 1s) + maxPollTime := time.Duration(maxWaitMs) * time.Millisecond + deadline := start.Add(maxPollTime) + pollLoop: + for time.Now().Before(deadline) { + // Use context-aware sleep instead of blocking time.Sleep + select { + case <-ctx.Done(): + throttleTimeMs = int32(time.Since(start) / time.Millisecond) + break pollLoop + case <-time.After(10 * time.Millisecond): + // Continue with polling + } + if hasDataAvailable() { + break pollLoop + } + } + elapsed := time.Since(start) + throttleTimeMs = int32(elapsed / time.Millisecond) + } + + // Build the response + response := make([]byte, 0, 1024) + totalAppendedRecordBytes := 0 + + // NOTE: Correlation ID is NOT included in the response body + // The wire protocol layer (writeResponseWithTimeout) writes: [Size][CorrelationID][Body] + // Kafka clients read the correlation ID separately from the 8-byte header, then read Size-4 bytes of body + // If we include correlation ID here, clients will see it twice and fail with "4 extra bytes" errors + + // Fetch v1+ has throttle_time_ms at the beginning + if apiVersion >= 1 { + throttleBytes := make([]byte, 4) + binary.BigEndian.PutUint32(throttleBytes, uint32(throttleTimeMs)) + response = append(response, throttleBytes...) + } + + // Fetch v7+ has error_code and session_id + if apiVersion >= 7 { + response = append(response, 0, 0) // error_code (2 bytes, 0 = no error) + response = append(response, 0, 0, 0, 0) // session_id (4 bytes, 0 = no session) + } + + // Check if this version uses flexible format (v12+) + isFlexible := IsFlexibleVersion(1, apiVersion) // API key 1 = Fetch + + // Topics count - write the actual number of topics in the request + // Kafka protocol: we MUST return all requested topics in the response (even with empty data) + topicsCount := len(fetchRequest.Topics) + if isFlexible { + // Flexible versions use compact array format (count + 1) + response = append(response, EncodeUvarint(uint32(topicsCount+1))...) + } else { + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, uint32(topicsCount)) + response = append(response, topicsCountBytes...) + } + + // ==================================================================== + // PERSISTENT PARTITION READERS + // Use per-connection persistent goroutines that maintain offset position + // and stream forward, eliminating repeated lookups and reducing broker CPU + // ==================================================================== + + // Get connection context to access persistent partition readers + connContext := h.getConnectionContextFromRequest(ctx) + if connContext == nil { + glog.Errorf("FETCH CORR=%d: Connection context not available - cannot use persistent readers", + correlationID) + return nil, fmt.Errorf("connection context not available") + } + + glog.V(2).Infof("[%s] FETCH CORR=%d: Processing %d topics with %d total partitions", + connContext.ConnectionID, correlationID, len(fetchRequest.Topics), + func() int { + count := 0 + for _, t := range fetchRequest.Topics { + count += len(t.Partitions) + } + return count + }()) + + // Collect results from persistent readers + // CRITICAL: Dispatch all requests concurrently, then wait for all results in parallel + // to avoid sequential timeout accumulation + type pendingFetch struct { + topicName string + partitionID int32 + resultChan chan *partitionFetchResult + } + + pending := make([]pendingFetch, 0) + persistentFetchStart := time.Now() + + // Phase 1: Dispatch all fetch requests to partition readers (non-blocking) + for _, topic := range fetchRequest.Topics { + isSchematizedTopic := false + if h.IsSchemaEnabled() { + isSchematizedTopic = h.isSchematizedTopic(topic.Name) + } + + for _, partition := range topic.Partitions { + key := TopicPartitionKey{Topic: topic.Name, Partition: partition.PartitionID} + + // All topics (including system topics) use persistent readers for in-memory access + // This enables instant notification and avoids ForceFlush dependencies + + // Get or create persistent reader for this partition + reader := h.getOrCreatePartitionReader(ctx, connContext, key, partition.FetchOffset) + if reader == nil { + // Failed to create reader - add empty pending + glog.Errorf("[%s] Failed to get/create partition reader for %s[%d]", + connContext.ConnectionID, topic.Name, partition.PartitionID) + nilChan := make(chan *partitionFetchResult, 1) + nilChan <- &partitionFetchResult{errorCode: 3} // UNKNOWN_TOPIC_OR_PARTITION + pending = append(pending, pendingFetch{ + topicName: topic.Name, + partitionID: partition.PartitionID, + resultChan: nilChan, + }) + continue + } + + // Signal reader to fetch (don't wait for result yet) + resultChan := make(chan *partitionFetchResult, 1) + fetchReq := &partitionFetchRequest{ + requestedOffset: partition.FetchOffset, + maxBytes: partition.MaxBytes, + maxWaitMs: maxWaitMs, // Pass MaxWaitTime from Kafka fetch request + resultChan: resultChan, + isSchematized: isSchematizedTopic, + apiVersion: apiVersion, + } + + // Try to send request (increased timeout for CI environments with slow disk I/O) + select { + case reader.fetchChan <- fetchReq: + // Request sent successfully, add to pending + pending = append(pending, pendingFetch{ + topicName: topic.Name, + partitionID: partition.PartitionID, + resultChan: resultChan, + }) + case <-time.After(200 * time.Millisecond): + // Channel full, return empty result + glog.Warningf("[%s] Reader channel full for %s[%d], returning empty", + connContext.ConnectionID, topic.Name, partition.PartitionID) + emptyChan := make(chan *partitionFetchResult, 1) + emptyChan <- &partitionFetchResult{} + pending = append(pending, pendingFetch{ + topicName: topic.Name, + partitionID: partition.PartitionID, + resultChan: emptyChan, + }) + } + } + } + + // Phase 2: Wait for all results with adequate timeout for CI environments + // CRITICAL: We MUST return a result for every requested partition or Sarama will error + results := make([]*partitionFetchResult, len(pending)) + deadline := time.After(500 * time.Millisecond) // 500ms for all partitions (increased for CI disk I/O) + + // Collect results one by one with shared deadline + for i, pf := range pending { + select { + case result := <-pf.resultChan: + results[i] = result + case <-deadline: + // Deadline expired, return empty for this and all remaining partitions + for j := i; j < len(pending); j++ { + results[j] = &partitionFetchResult{} + } + glog.V(1).Infof("[%s] Fetch deadline expired, returning empty for %d remaining partitions", + connContext.ConnectionID, len(pending)-i) + goto done + case <-ctx.Done(): + // Context cancelled, return empty for remaining + for j := i; j < len(pending); j++ { + results[j] = &partitionFetchResult{} + } + goto done + } + } +done: + + _ = time.Since(persistentFetchStart) // persistentFetchDuration + + // ==================================================================== + // BUILD RESPONSE FROM FETCHED DATA + // Now assemble the response in the correct order using fetched results + // ==================================================================== + + // CRITICAL: Verify we have results for all requested partitions + // Sarama requires a response block for EVERY requested partition to avoid ErrIncompleteResponse + expectedResultCount := 0 + for _, topic := range fetchRequest.Topics { + expectedResultCount += len(topic.Partitions) + } + if len(results) != expectedResultCount { + glog.Errorf("[%s] Result count mismatch: expected %d, got %d - this will cause ErrIncompleteResponse", + connContext.ConnectionID, expectedResultCount, len(results)) + // Pad with empty results if needed (safety net - shouldn't happen with fixed code) + for len(results) < expectedResultCount { + results = append(results, &partitionFetchResult{}) + } + } + + // Process each requested topic + resultIdx := 0 + for _, topic := range fetchRequest.Topics { + topicNameBytes := []byte(topic.Name) + + // Topic name length and name + if isFlexible { + // Flexible versions use compact string format (length + 1) + response = append(response, EncodeUvarint(uint32(len(topicNameBytes)+1))...) + } else { + response = append(response, byte(len(topicNameBytes)>>8), byte(len(topicNameBytes))) + } + response = append(response, topicNameBytes...) + + // Partitions count for this topic + partitionsCount := len(topic.Partitions) + if isFlexible { + // Flexible versions use compact array format (count + 1) + response = append(response, EncodeUvarint(uint32(partitionsCount+1))...) + } else { + partitionsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsCountBytes, uint32(partitionsCount)) + response = append(response, partitionsCountBytes...) + } + + // Process each requested partition (using pre-fetched results) + for _, partition := range topic.Partitions { + // Get the pre-fetched result for this partition + result := results[resultIdx] + resultIdx++ + + // Partition ID + partitionIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionIDBytes, uint32(partition.PartitionID)) + response = append(response, partitionIDBytes...) + + // Error code (2 bytes) - use the result's error code + response = append(response, byte(result.errorCode>>8), byte(result.errorCode)) + + // Use the pre-fetched high water mark from concurrent fetch + highWaterMark := result.highWaterMark + + // High water mark (8 bytes) + highWaterMarkBytes := make([]byte, 8) + binary.BigEndian.PutUint64(highWaterMarkBytes, uint64(highWaterMark)) + response = append(response, highWaterMarkBytes...) + + // Fetch v4+ has last_stable_offset and log_start_offset + if apiVersion >= 4 { + // Last stable offset (8 bytes) - same as high water mark for non-transactional + response = append(response, highWaterMarkBytes...) + // Log start offset (8 bytes) - 0 for simplicity + response = append(response, 0, 0, 0, 0, 0, 0, 0, 0) + + // Aborted transactions count (4 bytes) = 0 + response = append(response, 0, 0, 0, 0) + } + + // Use the pre-fetched record batch + recordBatch := result.recordBatch + + // Records size - flexible versions (v12+) use compact format: varint(size+1) + if isFlexible { + if len(recordBatch) == 0 { + response = append(response, 0) // null records = 0 in compact format + } else { + response = append(response, EncodeUvarint(uint32(len(recordBatch)+1))...) + } + } else { + // Non-flexible versions use int32(size) + recordsSizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(recordsSizeBytes, uint32(len(recordBatch))) + response = append(response, recordsSizeBytes...) + } + + // Records data + response = append(response, recordBatch...) + totalAppendedRecordBytes += len(recordBatch) + + // Tagged fields for flexible versions (v12+) after each partition + if isFlexible { + response = append(response, 0) // Empty tagged fields + } + } + + // Tagged fields for flexible versions (v12+) after each topic + if isFlexible { + response = append(response, 0) // Empty tagged fields + } + } + + // Tagged fields for flexible versions (v12+) at the end of response + if isFlexible { + response = append(response, 0) // Empty tagged fields + } + + // Verify topics count hasn't been corrupted + if !isFlexible { + // Topics count position depends on API version: + // v0: byte 0 (no throttle_time_ms, no error_code, no session_id) + // v1-v6: byte 4 (after throttle_time_ms) + // v7+: byte 10 (after throttle_time_ms, error_code, session_id) + var topicsCountPos int + if apiVersion == 0 { + topicsCountPos = 0 + } else if apiVersion < 7 { + topicsCountPos = 4 + } else { + topicsCountPos = 10 + } + + if len(response) >= topicsCountPos+4 { + actualTopicsCount := binary.BigEndian.Uint32(response[topicsCountPos : topicsCountPos+4]) + if actualTopicsCount != uint32(topicsCount) { + glog.Errorf("FETCH CORR=%d v%d: Topics count CORRUPTED! Expected %d, found %d at response[%d:%d]=%02x %02x %02x %02x", + correlationID, apiVersion, topicsCount, actualTopicsCount, topicsCountPos, topicsCountPos+4, + response[topicsCountPos], response[topicsCountPos+1], response[topicsCountPos+2], response[topicsCountPos+3]) + } + } + } + + return response, nil +} + +// FetchRequest represents a parsed Kafka Fetch request +type FetchRequest struct { + ReplicaID int32 + MaxWaitTime int32 + MinBytes int32 + MaxBytes int32 + IsolationLevel int8 + Topics []FetchTopic +} + +type FetchTopic struct { + Name string + Partitions []FetchPartition +} + +type FetchPartition struct { + PartitionID int32 + FetchOffset int64 + LogStartOffset int64 + MaxBytes int32 +} + +// parseFetchRequest parses a Kafka Fetch request +func (h *Handler) parseFetchRequest(apiVersion uint16, requestBody []byte) (*FetchRequest, error) { + if len(requestBody) < 12 { + return nil, fmt.Errorf("fetch request too short: %d bytes", len(requestBody)) + } + + offset := 0 + request := &FetchRequest{} + + // Check if this version uses flexible format (v12+) + isFlexible := IsFlexibleVersion(1, apiVersion) // API key 1 = Fetch + + // NOTE: client_id is already handled by HandleConn and stripped from requestBody + // Request body starts directly with fetch-specific fields + + // Replica ID (4 bytes) - always fixed + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for replica_id") + } + request.ReplicaID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + + // Max wait time (4 bytes) - always fixed + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for max_wait_time") + } + request.MaxWaitTime = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + + // Min bytes (4 bytes) - always fixed + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for min_bytes") + } + request.MinBytes = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + + // Max bytes (4 bytes) - only in v3+, always fixed + if apiVersion >= 3 { + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for max_bytes") + } + request.MaxBytes = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + } + + // Isolation level (1 byte) - only in v4+, always fixed + if apiVersion >= 4 { + if offset+1 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for isolation_level") + } + request.IsolationLevel = int8(requestBody[offset]) + offset += 1 + } + + // Session ID (4 bytes) and Session Epoch (4 bytes) - only in v7+, always fixed + if apiVersion >= 7 { + if offset+8 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for session_id and epoch") + } + offset += 8 // Skip session_id and session_epoch + } + + // Topics count - flexible uses compact array, non-flexible uses INT32 + var topicsCount int + if isFlexible { + // Compact array: length+1 encoded as varint + length, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode topics compact array: %w", err) + } + topicsCount = int(length) + offset += consumed + } else { + // Regular array: INT32 length + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for topics count") + } + topicsCount = int(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + } + + // Parse topics + request.Topics = make([]FetchTopic, topicsCount) + for i := 0; i < topicsCount; i++ { + // Topic name - flexible uses compact string, non-flexible uses STRING (INT16 length) + var topicName string + if isFlexible { + // Compact string: length+1 encoded as varint + name, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode topic name compact string: %w", err) + } + topicName = name + offset += consumed + } else { + // Regular string: INT16 length + bytes + if offset+2 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for topic name length") + } + topicNameLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + + if offset+topicNameLength > len(requestBody) { + return nil, fmt.Errorf("insufficient data for topic name") + } + topicName = string(requestBody[offset : offset+topicNameLength]) + offset += topicNameLength + } + request.Topics[i].Name = topicName + + // Partitions count - flexible uses compact array, non-flexible uses INT32 + var partitionsCount int + if isFlexible { + // Compact array: length+1 encoded as varint + length, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode partitions compact array: %w", err) + } + partitionsCount = int(length) + offset += consumed + } else { + // Regular array: INT32 length + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for partitions count") + } + partitionsCount = int(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + } + + // Parse partitions + request.Topics[i].Partitions = make([]FetchPartition, partitionsCount) + for j := 0; j < partitionsCount; j++ { + // Partition ID (4 bytes) - always fixed + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for partition ID") + } + request.Topics[i].Partitions[j].PartitionID = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + + // Current leader epoch (4 bytes) - only in v9+, always fixed + if apiVersion >= 9 { + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for current leader epoch") + } + offset += 4 // Skip current leader epoch + } + + // Fetch offset (8 bytes) - always fixed + if offset+8 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for fetch offset") + } + request.Topics[i].Partitions[j].FetchOffset = int64(binary.BigEndian.Uint64(requestBody[offset : offset+8])) + offset += 8 + + // Log start offset (8 bytes) - only in v5+, always fixed + if apiVersion >= 5 { + if offset+8 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for log start offset") + } + request.Topics[i].Partitions[j].LogStartOffset = int64(binary.BigEndian.Uint64(requestBody[offset : offset+8])) + offset += 8 + } + + // Partition max bytes (4 bytes) - always fixed + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for partition max bytes") + } + request.Topics[i].Partitions[j].MaxBytes = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + + // Tagged fields for partition (only in flexible versions v12+) + if isFlexible { + _, consumed, err := DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode partition tagged fields: %w", err) + } + offset += consumed + } + } + + // Tagged fields for topic (only in flexible versions v12+) + if isFlexible { + _, consumed, err := DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode topic tagged fields: %w", err) + } + offset += consumed + } + } + + // Forgotten topics data (only in v7+) + if apiVersion >= 7 { + // Skip forgotten topics array - we don't use incremental fetch yet + var forgottenTopicsCount int + if isFlexible { + length, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode forgotten topics compact array: %w", err) + } + forgottenTopicsCount = int(length) + offset += consumed + } else { + if offset+4 > len(requestBody) { + // End of request, no forgotten topics + return request, nil + } + forgottenTopicsCount = int(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + } + + // Skip forgotten topics if present + for i := 0; i < forgottenTopicsCount && offset < len(requestBody); i++ { + // Skip topic name + if isFlexible { + _, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + break + } + offset += consumed + } else { + if offset+2 > len(requestBody) { + break + } + nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + nameLen + } + + // Skip partitions array + if isFlexible { + length, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + break + } + offset += consumed + // Skip partition IDs (4 bytes each) + offset += int(length) * 4 + } else { + if offset+4 > len(requestBody) { + break + } + partCount := int(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + partCount*4 + } + + // Skip tagged fields if flexible + if isFlexible { + _, consumed, err := DecodeTaggedFields(requestBody[offset:]) + if err != nil { + break + } + offset += consumed + } + } + } + + // Rack ID (only in v11+) - optional string + if apiVersion >= 11 && offset < len(requestBody) { + if isFlexible { + _, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err == nil { + offset += consumed + } + } else { + if offset+2 <= len(requestBody) { + rackIDLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + if rackIDLen >= 0 && offset+2+rackIDLen <= len(requestBody) { + offset += 2 + rackIDLen + } + } + } + } + + // Top-level tagged fields (only in flexible versions v12+) + if isFlexible && offset < len(requestBody) { + _, consumed, err := DecodeTaggedFields(requestBody[offset:]) + if err != nil { + // Don't fail on trailing tagged fields parsing + } else { + offset += consumed + } + } + + return request, nil +} + +// constructRecordBatchFromSMQ creates a Kafka record batch from SeaweedMQ records +func (h *Handler) constructRecordBatchFromSMQ(topicName string, fetchOffset int64, smqRecords []integration.SMQRecord) []byte { + if len(smqRecords) == 0 { + return []byte{} + } + + // Create record batch using the SMQ records + batch := make([]byte, 0, 512) + + // Record batch header + baseOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(baseOffsetBytes, uint64(fetchOffset)) + batch = append(batch, baseOffsetBytes...) // base offset (8 bytes) + + // Calculate batch length (will be filled after we know the size) + batchLengthPos := len(batch) + batch = append(batch, 0, 0, 0, 0) // batch length placeholder (4 bytes) + + // Partition leader epoch (4 bytes) - use 0 (real Kafka uses 0, not -1) + batch = append(batch, 0x00, 0x00, 0x00, 0x00) + + // Magic byte (1 byte) - v2 format + batch = append(batch, 2) + + // CRC placeholder (4 bytes) - will be calculated later + crcPos := len(batch) + batch = append(batch, 0, 0, 0, 0) + + // Attributes (2 bytes) - no compression, etc. + batch = append(batch, 0, 0) + + // Last offset delta (4 bytes) + lastOffsetDelta := int32(len(smqRecords) - 1) + lastOffsetDeltaBytes := make([]byte, 4) + binary.BigEndian.PutUint32(lastOffsetDeltaBytes, uint32(lastOffsetDelta)) + batch = append(batch, lastOffsetDeltaBytes...) + + // Base timestamp (8 bytes) - convert from nanoseconds to milliseconds for Kafka compatibility + baseTimestamp := smqRecords[0].GetTimestamp() / 1000000 // Convert nanoseconds to milliseconds + baseTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(baseTimestampBytes, uint64(baseTimestamp)) + batch = append(batch, baseTimestampBytes...) + + // Max timestamp (8 bytes) - convert from nanoseconds to milliseconds for Kafka compatibility + maxTimestamp := baseTimestamp + if len(smqRecords) > 1 { + maxTimestamp = smqRecords[len(smqRecords)-1].GetTimestamp() / 1000000 // Convert nanoseconds to milliseconds + } + maxTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(maxTimestampBytes, uint64(maxTimestamp)) + batch = append(batch, maxTimestampBytes...) + + // Producer ID (8 bytes) - use -1 for no producer ID + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) + + // Producer epoch (2 bytes) - use -1 for no producer epoch + batch = append(batch, 0xFF, 0xFF) + + // Base sequence (4 bytes) - use -1 for no base sequence + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) + + // Records count (4 bytes) + recordCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(recordCountBytes, uint32(len(smqRecords))) + batch = append(batch, recordCountBytes...) + + // Add individual records from SMQ records + for i, smqRecord := range smqRecords { + // Build individual record + recordBytes := make([]byte, 0, 128) + + // Record attributes (1 byte) + recordBytes = append(recordBytes, 0) + + // Timestamp delta (varint) - calculate from base timestamp (both in milliseconds) + recordTimestampMs := smqRecord.GetTimestamp() / 1000000 // Convert nanoseconds to milliseconds + timestampDelta := recordTimestampMs - baseTimestamp // Both in milliseconds now + recordBytes = append(recordBytes, encodeVarint(timestampDelta)...) + + // Offset delta (varint) + offsetDelta := int64(i) + recordBytes = append(recordBytes, encodeVarint(offsetDelta)...) + + // Key length and key (varint + data) - decode RecordValue to get original Kafka message + key := h.decodeRecordValueToKafkaMessage(topicName, smqRecord.GetKey()) + if key == nil { + recordBytes = append(recordBytes, encodeVarint(-1)...) // null key + } else { + recordBytes = append(recordBytes, encodeVarint(int64(len(key)))...) + recordBytes = append(recordBytes, key...) + } + + // Value length and value (varint + data) - decode RecordValue to get original Kafka message + value := h.decodeRecordValueToKafkaMessage(topicName, smqRecord.GetValue()) + + if value == nil { + recordBytes = append(recordBytes, encodeVarint(-1)...) // null value + } else { + recordBytes = append(recordBytes, encodeVarint(int64(len(value)))...) + recordBytes = append(recordBytes, value...) + } + + // Headers count (varint) - 0 headers + recordBytes = append(recordBytes, encodeVarint(0)...) + + // Prepend record length (varint) + recordLength := int64(len(recordBytes)) + batch = append(batch, encodeVarint(recordLength)...) + batch = append(batch, recordBytes...) + } + + // Fill in the batch length + batchLength := uint32(len(batch) - batchLengthPos - 4) + binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], batchLength) + + // Calculate CRC32 for the batch + // Kafka CRC calculation covers: partition leader epoch + magic + attributes + ... (everything after batch length) + // Skip: BaseOffset(8) + BatchLength(4) = 12 bytes + crcData := batch[crcPos+4:] // CRC covers ONLY from attributes (byte 21) onwards // Skip CRC field itself, include rest + crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli)) + binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc) + + return batch +} + +// encodeVarint encodes a signed integer using Kafka's varint encoding +func encodeVarint(value int64) []byte { + // Kafka uses zigzag encoding for signed integers + zigzag := uint64((value << 1) ^ (value >> 63)) + + var buf []byte + for zigzag >= 0x80 { + buf = append(buf, byte(zigzag)|0x80) + zigzag >>= 7 + } + buf = append(buf, byte(zigzag)) + return buf +} + +// reconstructSchematizedMessage reconstructs a schematized message from SMQ RecordValue +func (h *Handler) reconstructSchematizedMessage(recordValue *schema_pb.RecordValue, metadata map[string]string) ([]byte, error) { + // Only reconstruct if schema management is enabled + if !h.IsSchemaEnabled() { + return nil, fmt.Errorf("schema management not enabled") + } + + // Extract schema information from metadata + schemaIDStr, exists := metadata["schema_id"] + if !exists { + return nil, fmt.Errorf("no schema ID in metadata") + } + + var schemaID uint32 + if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil { + return nil, fmt.Errorf("invalid schema ID: %w", err) + } + + formatStr, exists := metadata["schema_format"] + if !exists { + return nil, fmt.Errorf("no schema format in metadata") + } + + var format schema.Format + switch formatStr { + case "AVRO": + format = schema.FormatAvro + case "PROTOBUF": + format = schema.FormatProtobuf + case "JSON_SCHEMA": + format = schema.FormatJSONSchema + default: + return nil, fmt.Errorf("unsupported schema format: %s", formatStr) + } + + // Use schema manager to encode back to original format + return h.schemaManager.EncodeMessage(recordValue, schemaID, format) +} + +// SchematizedRecord holds both key and value for schematized messages +type SchematizedRecord struct { + Key []byte + Value []byte +} + +// fetchSchematizedRecords fetches and reconstructs schematized records from SeaweedMQ +func (h *Handler) fetchSchematizedRecords(topicName string, partitionID int32, offset int64, maxBytes int32) ([]*SchematizedRecord, error) { + glog.Infof("fetchSchematizedRecords: topic=%s partition=%d offset=%d maxBytes=%d", topicName, partitionID, offset, maxBytes) + + // Only proceed when schema feature is toggled on + if !h.useSchema { + glog.Infof("fetchSchematizedRecords EARLY RETURN: useSchema=false") + return []*SchematizedRecord{}, nil + } + + // Check if SeaweedMQ handler is available when schema feature is in use + if h.seaweedMQHandler == nil { + glog.Infof("fetchSchematizedRecords ERROR: seaweedMQHandler is nil") + return nil, fmt.Errorf("SeaweedMQ handler not available") + } + + // If schema management isn't fully configured, return empty instead of error + if !h.IsSchemaEnabled() { + glog.Infof("fetchSchematizedRecords EARLY RETURN: IsSchemaEnabled()=false") + return []*SchematizedRecord{}, nil + } + + // Fetch stored records from SeaweedMQ + maxRecords := 100 // Reasonable batch size limit + glog.Infof("fetchSchematizedRecords: calling GetStoredRecords maxRecords=%d", maxRecords) + smqRecords, err := h.seaweedMQHandler.GetStoredRecords(context.Background(), topicName, partitionID, offset, maxRecords) + if err != nil { + glog.Infof("fetchSchematizedRecords ERROR: GetStoredRecords failed: %v", err) + return nil, fmt.Errorf("failed to fetch SMQ records: %w", err) + } + + glog.Infof("fetchSchematizedRecords: GetStoredRecords returned %d records", len(smqRecords)) + if len(smqRecords) == 0 { + return []*SchematizedRecord{}, nil + } + + var reconstructedRecords []*SchematizedRecord + totalBytes := int32(0) + + for _, smqRecord := range smqRecords { + // Check if we've exceeded maxBytes limit + if maxBytes > 0 && totalBytes >= maxBytes { + break + } + + // Try to reconstruct the schematized message value + reconstructedValue, err := h.reconstructSchematizedMessageFromSMQ(smqRecord) + if err != nil { + // Log error but continue with other messages + Error("Failed to reconstruct schematized message at offset %d: %v", smqRecord.GetOffset(), err) + continue + } + + if reconstructedValue != nil { + // Create SchematizedRecord with both key and reconstructed value + record := &SchematizedRecord{ + Key: smqRecord.GetKey(), // Preserve the original key + Value: reconstructedValue, // Use the reconstructed value + } + reconstructedRecords = append(reconstructedRecords, record) + totalBytes += int32(len(record.Key) + len(record.Value)) + } + } + + return reconstructedRecords, nil +} + +// reconstructSchematizedMessageFromSMQ reconstructs a schematized message from an SMQRecord +func (h *Handler) reconstructSchematizedMessageFromSMQ(smqRecord integration.SMQRecord) ([]byte, error) { + // Get the stored value (should be a serialized RecordValue) + valueBytes := smqRecord.GetValue() + if len(valueBytes) == 0 { + return nil, fmt.Errorf("empty value in SMQ record") + } + + // Try to unmarshal as RecordValue + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(valueBytes, recordValue); err != nil { + // If it's not a RecordValue, it might be a regular Kafka message + // Return it as-is (non-schematized) + return valueBytes, nil + } + + // Extract schema metadata from the RecordValue fields + metadata := h.extractSchemaMetadataFromRecord(recordValue) + if len(metadata) == 0 { + // No schema metadata found, treat as regular message + return valueBytes, nil + } + + // Remove Kafka metadata fields to get the original message content + originalRecord := h.removeKafkaMetadataFields(recordValue) + + // Reconstruct the original Confluent envelope + return h.reconstructSchematizedMessage(originalRecord, metadata) +} + +// extractSchemaMetadataFromRecord extracts schema metadata from RecordValue fields +func (h *Handler) extractSchemaMetadataFromRecord(recordValue *schema_pb.RecordValue) map[string]string { + metadata := make(map[string]string) + + // Look for schema metadata fields in the record + if schemaIDField := recordValue.Fields["_schema_id"]; schemaIDField != nil { + if schemaIDValue := schemaIDField.GetStringValue(); schemaIDValue != "" { + metadata["schema_id"] = schemaIDValue + } + } + + if schemaFormatField := recordValue.Fields["_schema_format"]; schemaFormatField != nil { + if schemaFormatValue := schemaFormatField.GetStringValue(); schemaFormatValue != "" { + metadata["schema_format"] = schemaFormatValue + } + } + + if schemaSubjectField := recordValue.Fields["_schema_subject"]; schemaSubjectField != nil { + if schemaSubjectValue := schemaSubjectField.GetStringValue(); schemaSubjectValue != "" { + metadata["schema_subject"] = schemaSubjectValue + } + } + + if schemaVersionField := recordValue.Fields["_schema_version"]; schemaVersionField != nil { + if schemaVersionValue := schemaVersionField.GetStringValue(); schemaVersionValue != "" { + metadata["schema_version"] = schemaVersionValue + } + } + + return metadata +} + +// removeKafkaMetadataFields removes Kafka and schema metadata fields from RecordValue +func (h *Handler) removeKafkaMetadataFields(recordValue *schema_pb.RecordValue) *schema_pb.RecordValue { + originalRecord := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + + // Copy all fields except metadata fields + for key, value := range recordValue.Fields { + if !h.isMetadataField(key) { + originalRecord.Fields[key] = value + } + } + + return originalRecord +} + +// isMetadataField checks if a field is a metadata field that should be excluded from the original message +func (h *Handler) isMetadataField(fieldName string) bool { + return fieldName == "_kafka_offset" || + fieldName == "_kafka_partition" || + fieldName == "_kafka_timestamp" || + fieldName == "_schema_id" || + fieldName == "_schema_format" || + fieldName == "_schema_subject" || + fieldName == "_schema_version" +} + +// createSchematizedRecordBatch creates a Kafka record batch from reconstructed schematized messages +func (h *Handler) createSchematizedRecordBatch(records []*SchematizedRecord, baseOffset int64) []byte { + if len(records) == 0 { + // Return empty record batch + return h.createEmptyRecordBatch(baseOffset) + } + + // Create individual record entries for the batch + var recordsData []byte + currentTimestamp := time.Now().UnixMilli() + + for i, record := range records { + // Create a record entry (Kafka record format v2) with both key and value + recordEntry := h.createRecordEntry(record.Key, record.Value, int32(i), currentTimestamp) + recordsData = append(recordsData, recordEntry...) + } + + // Apply compression if the data is large enough to benefit + enableCompression := len(recordsData) > 100 + var compressionType compression.CompressionCodec = compression.None + var finalRecordsData []byte + + if enableCompression { + compressed, err := compression.Compress(compression.Gzip, recordsData) + if err == nil && len(compressed) < len(recordsData) { + finalRecordsData = compressed + compressionType = compression.Gzip + } else { + finalRecordsData = recordsData + } + } else { + finalRecordsData = recordsData + } + + // Create the record batch with proper compression and CRC + batch, err := h.createRecordBatchWithCompressionAndCRC(baseOffset, finalRecordsData, compressionType, int32(len(records)), currentTimestamp) + if err != nil { + // Fallback to simple batch creation + return h.createRecordBatchWithPayload(baseOffset, int32(len(records)), finalRecordsData) + } + + return batch +} + +// createRecordEntry creates a single record entry in Kafka record format v2 +func (h *Handler) createRecordEntry(messageKey []byte, messageData []byte, offsetDelta int32, timestamp int64) []byte { + // Record format v2: + // - length (varint) + // - attributes (int8) + // - timestamp delta (varint) + // - offset delta (varint) + // - key length (varint) + key + // - value length (varint) + value + // - headers count (varint) + headers + + var record []byte + + // Attributes (1 byte) - no special attributes + record = append(record, 0) + + // Timestamp delta (varint) - 0 for now (all messages have same timestamp) + record = append(record, encodeVarint(0)...) + + // Offset delta (varint) + record = append(record, encodeVarint(int64(offsetDelta))...) + + // Key length (varint) + key + if messageKey == nil || len(messageKey) == 0 { + record = append(record, encodeVarint(-1)...) // -1 indicates null key + } else { + record = append(record, encodeVarint(int64(len(messageKey)))...) + record = append(record, messageKey...) + } + + // Value length (varint) + value + record = append(record, encodeVarint(int64(len(messageData)))...) + record = append(record, messageData...) + + // Headers count (varint) - no headers + record = append(record, encodeVarint(0)...) + + // Prepend the total record length (varint) + recordLength := encodeVarint(int64(len(record))) + return append(recordLength, record...) +} + +// createRecordBatchWithCompressionAndCRC creates a Kafka record batch with proper compression and CRC +func (h *Handler) createRecordBatchWithCompressionAndCRC(baseOffset int64, recordsData []byte, compressionType compression.CompressionCodec, recordCount int32, baseTimestampMs int64) ([]byte, error) { + // Create record batch header + // Validate size to prevent overflow + const maxBatchSize = 1 << 30 // 1 GB limit + if len(recordsData) > maxBatchSize-61 { + return nil, fmt.Errorf("records data too large: %d bytes", len(recordsData)) + } + batch := make([]byte, 0, len(recordsData)+61) // 61 bytes for header + + // Base offset (8 bytes) + baseOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset)) + batch = append(batch, baseOffsetBytes...) + + // Batch length placeholder (4 bytes) - will be filled later + batchLengthPos := len(batch) + batch = append(batch, 0, 0, 0, 0) + + // Partition leader epoch (4 bytes) + batch = append(batch, 0, 0, 0, 0) + + // Magic byte (1 byte) - version 2 + batch = append(batch, 2) + + // CRC placeholder (4 bytes) - will be calculated later + crcPos := len(batch) + batch = append(batch, 0, 0, 0, 0) + + // Attributes (2 bytes) - compression type and other flags + attributes := int16(compressionType) // Set compression type in lower 3 bits + attributesBytes := make([]byte, 2) + binary.BigEndian.PutUint16(attributesBytes, uint16(attributes)) + batch = append(batch, attributesBytes...) + + // Last offset delta (4 bytes) + lastOffsetDelta := uint32(recordCount - 1) + lastOffsetDeltaBytes := make([]byte, 4) + binary.BigEndian.PutUint32(lastOffsetDeltaBytes, lastOffsetDelta) + batch = append(batch, lastOffsetDeltaBytes...) + + // First timestamp (8 bytes) - use the same timestamp used to build record entries + firstTimestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(firstTimestampBytes, uint64(baseTimestampMs)) + batch = append(batch, firstTimestampBytes...) + + // Max timestamp (8 bytes) - same as first for simplicity + batch = append(batch, firstTimestampBytes...) + + // Producer ID (8 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) + + // Producer epoch (2 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF) + + // Base sequence (4 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) + + // Record count (4 bytes) + recordCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(recordCountBytes, uint32(recordCount)) + batch = append(batch, recordCountBytes...) + + // Records payload (compressed or uncompressed) + batch = append(batch, recordsData...) + + // Calculate and set batch length (excluding base offset and batch length fields) + batchLength := len(batch) - 12 // 8 bytes base offset + 4 bytes batch length + binary.BigEndian.PutUint32(batch[batchLengthPos:batchLengthPos+4], uint32(batchLength)) + + // Calculate and set CRC32 over attributes..end (exclude CRC field itself) + // Kafka uses Castagnoli (CRC-32C) algorithm. CRC covers ONLY from attributes offset (byte 21) onwards. + // See: DefaultRecordBatch.java computeChecksum() - Crc32C.compute(buffer, ATTRIBUTES_OFFSET, ...) + crcData := batch[crcPos+4:] // Skip CRC field itself (bytes 17..20) and include the rest + crc := crc32.Checksum(crcData, crc32.MakeTable(crc32.Castagnoli)) + binary.BigEndian.PutUint32(batch[crcPos:crcPos+4], crc) + + return batch, nil +} + +// createEmptyRecordBatch creates an empty Kafka record batch using the new parser +func (h *Handler) createEmptyRecordBatch(baseOffset int64) []byte { + // Use the new record batch creation function with no compression + emptyRecords := []byte{} + batch, err := CreateRecordBatch(baseOffset, emptyRecords, compression.None) + if err != nil { + // Fallback to manual creation if there's an error + return h.createEmptyRecordBatchManual(baseOffset) + } + return batch +} + +// createEmptyRecordBatchManual creates an empty Kafka record batch manually (fallback) +func (h *Handler) createEmptyRecordBatchManual(baseOffset int64) []byte { + // Create a minimal empty record batch + batch := make([]byte, 0, 61) // Standard record batch header size + + // Base offset (8 bytes) + baseOffsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset)) + batch = append(batch, baseOffsetBytes...) + + // Batch length (4 bytes) - will be filled at the end + lengthPlaceholder := len(batch) + batch = append(batch, 0, 0, 0, 0) + + // Partition leader epoch (4 bytes) - 0 for simplicity + batch = append(batch, 0, 0, 0, 0) + + // Magic byte (1 byte) - version 2 + batch = append(batch, 2) + + // CRC32 (4 bytes) - placeholder, should be calculated + batch = append(batch, 0, 0, 0, 0) + + // Attributes (2 bytes) - no compression, no transactional + batch = append(batch, 0, 0) + + // Last offset delta (4 bytes) - 0 for empty batch + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) + + // First timestamp (8 bytes) - current time + timestamp := time.Now().UnixMilli() + timestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(timestampBytes, uint64(timestamp)) + batch = append(batch, timestampBytes...) + + // Max timestamp (8 bytes) - same as first for empty batch + batch = append(batch, timestampBytes...) + + // Producer ID (8 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF) + + // Producer Epoch (2 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF) + + // Base Sequence (4 bytes) - -1 for non-transactional + batch = append(batch, 0xFF, 0xFF, 0xFF, 0xFF) + + // Record count (4 bytes) - 0 for empty batch + batch = append(batch, 0, 0, 0, 0) + + // Fill in the batch length + batchLength := len(batch) - 12 // Exclude base offset and length field itself + binary.BigEndian.PutUint32(batch[lengthPlaceholder:lengthPlaceholder+4], uint32(batchLength)) + + return batch +} + +// createRecordBatchWithPayload creates a record batch with the given payload +func (h *Handler) createRecordBatchWithPayload(baseOffset int64, recordCount int32, payload []byte) []byte { + // For Phase 7, create a simplified record batch + // In Phase 8, this will implement proper Kafka record batch format v2 + + batch := h.createEmptyRecordBatch(baseOffset) + + // Update record count + recordCountOffset := len(batch) - 4 + binary.BigEndian.PutUint32(batch[recordCountOffset:recordCountOffset+4], uint32(recordCount)) + + // Append payload (simplified - real implementation would format individual records) + batch = append(batch, payload...) + + // Update batch length + batchLength := len(batch) - 12 + binary.BigEndian.PutUint32(batch[8:12], uint32(batchLength)) + + return batch +} + +// handleSchematizedFetch handles fetch requests for topics with schematized messages +func (h *Handler) handleSchematizedFetch(topicName string, partitionID int32, offset int64, maxBytes int32) ([]byte, error) { + // Check if this topic uses schema management + if !h.IsSchemaEnabled() { + // Fall back to regular fetch handling + return nil, fmt.Errorf("schema management not enabled") + } + + // Fetch schematized records from SeaweedMQ + records, err := h.fetchSchematizedRecords(topicName, partitionID, offset, maxBytes) + if err != nil { + return nil, fmt.Errorf("failed to fetch schematized records: %w", err) + } + + // Create record batch from reconstructed records + recordBatch := h.createSchematizedRecordBatch(records, offset) + + return recordBatch, nil +} + +// isSchematizedTopic checks if a topic uses schema management +func (h *Handler) isSchematizedTopic(topicName string) bool { + // System topics (_schemas, __consumer_offsets, etc.) should NEVER use schema encoding + // They have their own internal formats and should be passed through as-is + if h.isSystemTopic(topicName) { + return false + } + + if !h.IsSchemaEnabled() { + return false + } + + // Check multiple indicators for schematized topics: + + // Check Confluent Schema Registry naming conventions + return h.matchesSchemaRegistryConvention(topicName) +} + +// matchesSchemaRegistryConvention checks Confluent Schema Registry naming patterns +func (h *Handler) matchesSchemaRegistryConvention(topicName string) bool { + // Common Schema Registry subject patterns: + // - topicName-value (for message values) + // - topicName-key (for message keys) + // - topicName (direct topic name as subject) + + if len(topicName) > 6 && topicName[len(topicName)-6:] == "-value" { + return true + } + if len(topicName) > 4 && topicName[len(topicName)-4:] == "-key" { + return true + } + + // Check if the topic has registered schema subjects in Schema Registry + // Use standard Kafka naming convention: <topic>-value and <topic>-key + if h.schemaManager != nil { + // Check with -value suffix (standard pattern for value schemas) + latestSchemaValue, err := h.schemaManager.GetLatestSchema(topicName + "-value") + if err == nil { + // Since we retrieved schema from registry, ensure topic config is updated + h.ensureTopicSchemaFromLatestSchema(topicName, latestSchemaValue) + return true + } + + // Check with -key suffix (for key schemas) + latestSchemaKey, err := h.schemaManager.GetLatestSchema(topicName + "-key") + if err == nil { + // Since we retrieved key schema from registry, ensure topic config is updated + h.ensureTopicKeySchemaFromLatestSchema(topicName, latestSchemaKey) + return true + } + } + + return false +} + +// getSchemaMetadataForTopic retrieves schema metadata for a topic +func (h *Handler) getSchemaMetadataForTopic(topicName string) (map[string]string, error) { + if !h.IsSchemaEnabled() { + return nil, fmt.Errorf("schema management not enabled") + } + + // Try multiple approaches to get schema metadata from Schema Registry + + // 1. Try to get schema from registry using topic name as subject + metadata, err := h.getSchemaMetadataFromRegistry(topicName) + if err == nil { + return metadata, nil + } + + // 2. Try with -value suffix (common pattern) + metadata, err = h.getSchemaMetadataFromRegistry(topicName + "-value") + if err == nil { + return metadata, nil + } + + // 3. Try with -key suffix + metadata, err = h.getSchemaMetadataFromRegistry(topicName + "-key") + if err == nil { + return metadata, nil + } + + return nil, fmt.Errorf("no schema found in registry for topic %s (tried %s, %s-value, %s-key)", topicName, topicName, topicName, topicName) +} + +// getSchemaMetadataFromRegistry retrieves schema metadata from Schema Registry +func (h *Handler) getSchemaMetadataFromRegistry(subject string) (map[string]string, error) { + if h.schemaManager == nil { + return nil, fmt.Errorf("schema manager not available") + } + + // Get latest schema for the subject + cachedSchema, err := h.schemaManager.GetLatestSchema(subject) + if err != nil { + return nil, fmt.Errorf("failed to get schema for subject %s: %w", subject, err) + } + + // Since we retrieved schema from registry, ensure topic config is updated + // Extract topic name from subject (remove -key or -value suffix if present) + topicName := h.extractTopicFromSubject(subject) + if topicName != "" { + h.ensureTopicSchemaFromLatestSchema(topicName, cachedSchema) + } + + // Build metadata map + // Detect format from schema content + // Simple format detection - assume Avro for now + format := schema.FormatAvro + + metadata := map[string]string{ + "schema_id": fmt.Sprintf("%d", cachedSchema.LatestID), + "schema_format": format.String(), + "schema_subject": subject, + "schema_version": fmt.Sprintf("%d", cachedSchema.Version), + "schema_content": cachedSchema.Schema, + } + + return metadata, nil +} + +// ensureTopicSchemaFromLatestSchema ensures topic configuration is updated when latest schema is retrieved +func (h *Handler) ensureTopicSchemaFromLatestSchema(topicName string, latestSchema *schema.CachedSubject) { + if latestSchema == nil { + return + } + + // Convert CachedSubject to CachedSchema format for reuse + // Note: CachedSubject has different field structure than expected + cachedSchema := &schema.CachedSchema{ + ID: latestSchema.LatestID, + Schema: latestSchema.Schema, + Subject: latestSchema.Subject, + Version: latestSchema.Version, + Format: schema.FormatAvro, // Default to Avro, could be improved with format detection + CachedAt: latestSchema.CachedAt, + } + + // Use existing function to handle the schema update + h.ensureTopicSchemaFromRegistryCache(topicName, cachedSchema) +} + +// extractTopicFromSubject extracts the topic name from a schema registry subject +func (h *Handler) extractTopicFromSubject(subject string) string { + // Remove common suffixes used in schema registry + if strings.HasSuffix(subject, "-value") { + return strings.TrimSuffix(subject, "-value") + } + if strings.HasSuffix(subject, "-key") { + return strings.TrimSuffix(subject, "-key") + } + // If no suffix, assume subject name is the topic name + return subject +} + +// ensureTopicKeySchemaFromLatestSchema ensures topic configuration is updated when key schema is retrieved +func (h *Handler) ensureTopicKeySchemaFromLatestSchema(topicName string, latestSchema *schema.CachedSubject) { + if latestSchema == nil { + return + } + + // Convert CachedSubject to CachedSchema format for reuse + // Note: CachedSubject has different field structure than expected + cachedSchema := &schema.CachedSchema{ + ID: latestSchema.LatestID, + Schema: latestSchema.Schema, + Subject: latestSchema.Subject, + Version: latestSchema.Version, + Format: schema.FormatAvro, // Default to Avro, could be improved with format detection + CachedAt: latestSchema.CachedAt, + } + + // Use existing function to handle the key schema update + h.ensureTopicKeySchemaFromRegistryCache(topicName, cachedSchema) +} + +// decodeRecordValueToKafkaMessage decodes a RecordValue back to the original Kafka message bytes +func (h *Handler) decodeRecordValueToKafkaMessage(topicName string, recordValueBytes []byte) []byte { + if recordValueBytes == nil { + return nil + } + + // CRITICAL FIX: For system topics like _schemas, _consumer_offsets, etc., + // return the raw bytes as-is. These topics store Kafka's internal format (Avro, etc.) + // and should NOT be processed as RecordValue protobuf messages. + if strings.HasPrefix(topicName, "_") { + return recordValueBytes + } + + // Try to unmarshal as RecordValue + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(recordValueBytes, recordValue); err != nil { + // Not a RecordValue format - this is normal for Avro/JSON/raw Kafka messages + // Return raw bytes as-is (Kafka consumers expect this) + return recordValueBytes + } + + // If schema management is enabled, re-encode the RecordValue to Confluent format + if h.IsSchemaEnabled() { + if encodedMsg, err := h.encodeRecordValueToConfluentFormat(topicName, recordValue); err == nil { + return encodedMsg + } else { + } + } + + // Fallback: convert RecordValue to JSON + return h.recordValueToJSON(recordValue) +} + +// encodeRecordValueToConfluentFormat re-encodes a RecordValue back to Confluent format +func (h *Handler) encodeRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) { + if recordValue == nil { + return nil, fmt.Errorf("RecordValue is nil") + } + + // Get schema configuration from topic config + schemaConfig, err := h.getTopicSchemaConfig(topicName) + if err != nil { + return nil, fmt.Errorf("failed to get topic schema config: %w", err) + } + + // Use schema manager to encode RecordValue back to original format + encodedBytes, err := h.schemaManager.EncodeMessage(recordValue, schemaConfig.ValueSchemaID, schemaConfig.ValueSchemaFormat) + if err != nil { + return nil, fmt.Errorf("failed to encode RecordValue: %w", err) + } + + return encodedBytes, nil +} + +// getTopicSchemaConfig retrieves schema configuration for a topic +func (h *Handler) getTopicSchemaConfig(topicName string) (*TopicSchemaConfig, error) { + h.topicSchemaConfigMu.RLock() + defer h.topicSchemaConfigMu.RUnlock() + + if h.topicSchemaConfigs == nil { + return nil, fmt.Errorf("no schema configuration available for topic: %s", topicName) + } + + config, exists := h.topicSchemaConfigs[topicName] + if !exists { + return nil, fmt.Errorf("no schema configuration found for topic: %s", topicName) + } + + return config, nil +} + +// decodeRecordValueToKafkaKey decodes a key RecordValue back to the original Kafka key bytes +func (h *Handler) decodeRecordValueToKafkaKey(topicName string, keyRecordValueBytes []byte) []byte { + if keyRecordValueBytes == nil { + return nil + } + + // Try to get topic schema config + schemaConfig, err := h.getTopicSchemaConfig(topicName) + if err != nil || !schemaConfig.HasKeySchema { + // No key schema config available, return raw bytes + return keyRecordValueBytes + } + + // Try to unmarshal as RecordValue + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(keyRecordValueBytes, recordValue); err != nil { + // If it's not a RecordValue, return the raw bytes + return keyRecordValueBytes + } + + // If key schema management is enabled, re-encode the RecordValue to Confluent format + if h.IsSchemaEnabled() { + if encodedKey, err := h.encodeKeyRecordValueToConfluentFormat(topicName, recordValue); err == nil { + return encodedKey + } + } + + // Fallback: convert RecordValue to JSON + return h.recordValueToJSON(recordValue) +} + +// encodeKeyRecordValueToConfluentFormat re-encodes a key RecordValue back to Confluent format +func (h *Handler) encodeKeyRecordValueToConfluentFormat(topicName string, recordValue *schema_pb.RecordValue) ([]byte, error) { + if recordValue == nil { + return nil, fmt.Errorf("key RecordValue is nil") + } + + // Get schema configuration from topic config + schemaConfig, err := h.getTopicSchemaConfig(topicName) + if err != nil { + return nil, fmt.Errorf("failed to get topic schema config: %w", err) + } + + if !schemaConfig.HasKeySchema { + return nil, fmt.Errorf("no key schema configured for topic: %s", topicName) + } + + // Use schema manager to encode RecordValue back to original format + encodedBytes, err := h.schemaManager.EncodeMessage(recordValue, schemaConfig.KeySchemaID, schemaConfig.KeySchemaFormat) + if err != nil { + return nil, fmt.Errorf("failed to encode key RecordValue: %w", err) + } + + return encodedBytes, nil +} + +// recordValueToJSON converts a RecordValue to JSON bytes (fallback) +func (h *Handler) recordValueToJSON(recordValue *schema_pb.RecordValue) []byte { + if recordValue == nil || recordValue.Fields == nil { + return []byte("{}") + } + + // Simple JSON conversion - in a real implementation, this would be more sophisticated + jsonStr := "{" + first := true + for fieldName, fieldValue := range recordValue.Fields { + if !first { + jsonStr += "," + } + first = false + + jsonStr += fmt.Sprintf(`"%s":`, fieldName) + + switch v := fieldValue.Kind.(type) { + case *schema_pb.Value_StringValue: + jsonStr += fmt.Sprintf(`"%s"`, v.StringValue) + case *schema_pb.Value_BytesValue: + jsonStr += fmt.Sprintf(`"%s"`, string(v.BytesValue)) + case *schema_pb.Value_Int32Value: + jsonStr += fmt.Sprintf(`%d`, v.Int32Value) + case *schema_pb.Value_Int64Value: + jsonStr += fmt.Sprintf(`%d`, v.Int64Value) + case *schema_pb.Value_BoolValue: + jsonStr += fmt.Sprintf(`%t`, v.BoolValue) + default: + jsonStr += `null` + } + } + jsonStr += "}" + + return []byte(jsonStr) +} + +// fetchPartitionData fetches data for a single partition (called concurrently) +func (h *Handler) fetchPartitionData( + ctx context.Context, + topicName string, + partition FetchPartition, + apiVersion uint16, + isSchematizedTopic bool, +) *partitionFetchResult { + startTime := time.Now() + result := &partitionFetchResult{} + + // Get the actual high water mark from SeaweedMQ + highWaterMark, err := h.seaweedMQHandler.GetLatestOffset(topicName, partition.PartitionID) + if err != nil { + highWaterMark = 0 + } + result.highWaterMark = highWaterMark + + // Check if topic exists + if !h.seaweedMQHandler.TopicExists(topicName) { + if isSystemTopic(topicName) { + // Auto-create system topics + if err := h.createTopicWithSchemaSupport(topicName, 1); err != nil { + result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION + result.fetchDuration = time.Since(startTime) + return result + } + } else { + result.errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION + result.fetchDuration = time.Since(startTime) + return result + } + } + + // Normalize special fetch offsets + effectiveFetchOffset := partition.FetchOffset + if effectiveFetchOffset < 0 { + if effectiveFetchOffset == -2 { + effectiveFetchOffset = 0 + } else if effectiveFetchOffset == -1 { + effectiveFetchOffset = highWaterMark + } + } + + // Fetch records if available + var recordBatch []byte + if highWaterMark > effectiveFetchOffset { + // Use multi-batch fetcher (pass context to respect timeout) + multiFetcher := NewMultiBatchFetcher(h) + fetchResult, err := multiFetcher.FetchMultipleBatches( + ctx, + topicName, + partition.PartitionID, + effectiveFetchOffset, + highWaterMark, + partition.MaxBytes, + ) + + if err == nil && fetchResult.TotalSize > 0 { + recordBatch = fetchResult.RecordBatches + } else { + // Fallback to single batch (pass context to respect timeout) + smqRecords, err := h.seaweedMQHandler.GetStoredRecords(ctx, topicName, partition.PartitionID, effectiveFetchOffset, 10) + if err == nil && len(smqRecords) > 0 { + recordBatch = h.constructRecordBatchFromSMQ(topicName, effectiveFetchOffset, smqRecords) + } else { + recordBatch = []byte{} + } + } + } else { + recordBatch = []byte{} + } + + // Try schematized records if needed and recordBatch is empty + if isSchematizedTopic && len(recordBatch) == 0 { + schematizedRecords, err := h.fetchSchematizedRecords(topicName, partition.PartitionID, effectiveFetchOffset, partition.MaxBytes) + if err == nil && len(schematizedRecords) > 0 { + schematizedBatch := h.createSchematizedRecordBatch(schematizedRecords, effectiveFetchOffset) + if len(schematizedBatch) > 0 { + recordBatch = schematizedBatch + } + } + } + + result.recordBatch = recordBatch + result.fetchDuration = time.Since(startTime) + return result +} |
