diff options
Diffstat (limited to 'weed/mq/kafka/protocol/handler.go')
| -rw-r--r-- | weed/mq/kafka/protocol/handler.go | 4195 |
1 files changed, 4195 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go new file mode 100644 index 000000000..fcfe196c2 --- /dev/null +++ b/weed/mq/kafka/protocol/handler.go @@ -0,0 +1,4195 @@ +package protocol + +import ( + "bufio" + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "net" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer_offset" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" + mqschema "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// GetAdvertisedAddress returns the host:port that should be advertised to clients +// This handles the Docker networking issue where internal IPs aren't reachable by external clients +func (h *Handler) GetAdvertisedAddress(gatewayAddr string) (string, int) { + host, port := "localhost", 9093 + + // Try to parse the gateway address if provided to get the port + if gatewayAddr != "" { + if _, gatewayPort, err := net.SplitHostPort(gatewayAddr); err == nil { + if gatewayPortInt, err := strconv.Atoi(gatewayPort); err == nil { + port = gatewayPortInt // Only use the port, not the host + } + } + } + + // Override with environment variable if set, otherwise always use localhost for external clients + if advertisedHost := os.Getenv("KAFKA_ADVERTISED_HOST"); advertisedHost != "" { + host = advertisedHost + } else { + host = "localhost" + } + + return host, port +} + +// TopicInfo holds basic information about a topic +type TopicInfo struct { + Name string + Partitions int32 + CreatedAt int64 +} + +// TopicPartitionKey uniquely identifies a topic partition +type TopicPartitionKey struct { + Topic string + Partition int32 +} + +// contextKey is a type for context keys to avoid collisions +type contextKey string + +const ( + // connContextKey is the context key for storing ConnectionContext + connContextKey contextKey = "connectionContext" +) + +// kafkaRequest represents a Kafka API request to be processed +type kafkaRequest struct { + correlationID uint32 + apiKey uint16 + apiVersion uint16 + requestBody []byte + ctx context.Context + connContext *ConnectionContext // Per-connection context to avoid race conditions +} + +// kafkaResponse represents a Kafka API response +type kafkaResponse struct { + correlationID uint32 + apiKey uint16 + apiVersion uint16 + response []byte + err error +} + +const ( + // DefaultKafkaNamespace is the default namespace for Kafka topics in SeaweedMQ + DefaultKafkaNamespace = "kafka" +) + +// APIKey represents a Kafka API key type for better type safety +type APIKey uint16 + +// Kafka API Keys +const ( + APIKeyProduce APIKey = 0 + APIKeyFetch APIKey = 1 + APIKeyListOffsets APIKey = 2 + APIKeyMetadata APIKey = 3 + APIKeyOffsetCommit APIKey = 8 + APIKeyOffsetFetch APIKey = 9 + APIKeyFindCoordinator APIKey = 10 + APIKeyJoinGroup APIKey = 11 + APIKeyHeartbeat APIKey = 12 + APIKeyLeaveGroup APIKey = 13 + APIKeySyncGroup APIKey = 14 + APIKeyDescribeGroups APIKey = 15 + APIKeyListGroups APIKey = 16 + APIKeyApiVersions APIKey = 18 + APIKeyCreateTopics APIKey = 19 + APIKeyDeleteTopics APIKey = 20 + APIKeyInitProducerId APIKey = 22 + APIKeyDescribeConfigs APIKey = 32 + APIKeyDescribeCluster APIKey = 60 +) + +// SeaweedMQHandlerInterface defines the interface for SeaweedMQ integration +type SeaweedMQHandlerInterface interface { + TopicExists(topic string) bool + ListTopics() []string + CreateTopic(topic string, partitions int32) error + CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error + DeleteTopic(topic string) error + GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) + // Ledger methods REMOVED - SMQ handles Kafka offsets natively + ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) + ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) + // GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations) + // ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime) + GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) + // GetEarliestOffset returns the earliest available offset for a topic partition + GetEarliestOffset(topic string, partition int32) (int64, error) + // GetLatestOffset returns the latest available offset for a topic partition + GetLatestOffset(topic string, partition int32) (int64, error) + // WithFilerClient executes a function with a filer client for accessing SeaweedMQ metadata + WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error + // GetBrokerAddresses returns the discovered SMQ broker addresses for Metadata responses + GetBrokerAddresses() []string + // CreatePerConnectionBrokerClient creates an isolated BrokerClient for each TCP connection + CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) + // SetProtocolHandler sets the protocol handler reference for connection context access + SetProtocolHandler(handler integration.ProtocolHandler) + Close() error +} + +// ConsumerOffsetStorage defines the interface for storing consumer offsets +// This is used by OffsetCommit and OffsetFetch protocol handlers +type ConsumerOffsetStorage interface { + CommitOffset(group, topic string, partition int32, offset int64, metadata string) error + FetchOffset(group, topic string, partition int32) (int64, string, error) + FetchAllOffsets(group string) (map[TopicPartition]OffsetMetadata, error) + DeleteGroup(group string) error + Close() error +} + +// TopicPartition uniquely identifies a topic partition for offset storage +type TopicPartition struct { + Topic string + Partition int32 +} + +// OffsetMetadata contains offset and associated metadata +type OffsetMetadata struct { + Offset int64 + Metadata string +} + +// TopicSchemaConfig holds schema configuration for a topic +type TopicSchemaConfig struct { + // Value schema configuration + ValueSchemaID uint32 + ValueSchemaFormat schema.Format + + // Key schema configuration (optional) + KeySchemaID uint32 + KeySchemaFormat schema.Format + HasKeySchema bool // indicates if key schema is configured +} + +// Legacy accessors for backward compatibility +func (c *TopicSchemaConfig) SchemaID() uint32 { + return c.ValueSchemaID +} + +func (c *TopicSchemaConfig) SchemaFormat() schema.Format { + return c.ValueSchemaFormat +} + +// getTopicSchemaFormat returns the schema format string for a topic +func (h *Handler) getTopicSchemaFormat(topic string) string { + h.topicSchemaConfigMu.RLock() + defer h.topicSchemaConfigMu.RUnlock() + + if config, exists := h.topicSchemaConfigs[topic]; exists { + return config.ValueSchemaFormat.String() + } + return "" // Empty string means schemaless or format unknown +} + +// stringPtr returns a pointer to the given string +func stringPtr(s string) *string { + return &s +} + +// Handler processes Kafka protocol requests from clients using SeaweedMQ +type Handler struct { + // SeaweedMQ integration + seaweedMQHandler SeaweedMQHandlerInterface + + // SMQ offset storage removed - using ConsumerOffsetStorage instead + + // Consumer offset storage for Kafka protocol OffsetCommit/OffsetFetch + consumerOffsetStorage ConsumerOffsetStorage + + // Consumer group coordination + groupCoordinator *consumer.GroupCoordinator + + // Response caching to reduce CPU usage for repeated requests + metadataCache *ResponseCache + coordinatorCache *ResponseCache + + // Coordinator registry for distributed coordinator assignment + coordinatorRegistry CoordinatorRegistryInterface + + // Schema management (optional, for schematized topics) + schemaManager *schema.Manager + useSchema bool + brokerClient *schema.BrokerClient + + // Topic schema configuration cache + topicSchemaConfigs map[string]*TopicSchemaConfig + topicSchemaConfigMu sync.RWMutex + + // Track registered schemas to prevent duplicate registrations + registeredSchemas map[string]bool // key: "topic:schemaID" or "topic-key:schemaID" + registeredSchemasMu sync.RWMutex + + filerClient filer_pb.SeaweedFilerClient + + // SMQ broker addresses discovered from masters for Metadata responses + smqBrokerAddresses []string + + // Gateway address for coordinator registry + gatewayAddress string + + // Connection contexts stored per connection ID (thread-safe) + // Replaces the race-prone shared connContext field + connContexts sync.Map // map[string]*ConnectionContext + + // Schema Registry URL for delayed initialization + schemaRegistryURL string + + // Default partition count for auto-created topics + defaultPartitions int32 +} + +// NewHandler creates a basic Kafka handler with in-memory storage +// WARNING: This is for testing ONLY - never use in production! +// For production use with persistent storage, use NewSeaweedMQBrokerHandler instead +func NewHandler() *Handler { + // Production safety check - prevent accidental production use + // Comment out for testing: os.Getenv can be used for runtime checks + panic("NewHandler() with in-memory storage should NEVER be used in production! Use NewSeaweedMQBrokerHandler() with SeaweedMQ masters for production, or NewTestHandler() for tests.") +} + +// NewTestHandler and NewSimpleTestHandler moved to handler_test.go (test-only file) + +// All test-related types and implementations moved to handler_test.go (test-only file) + +// NewTestHandlerWithMock creates a test handler with a custom SeaweedMQHandlerInterface +// This is useful for unit tests that need a handler but don't want to connect to real SeaweedMQ +func NewTestHandlerWithMock(mockHandler SeaweedMQHandlerInterface) *Handler { + return &Handler{ + seaweedMQHandler: mockHandler, + consumerOffsetStorage: nil, // Unit tests don't need offset storage + groupCoordinator: consumer.NewGroupCoordinator(), + registeredSchemas: make(map[string]bool), + topicSchemaConfigs: make(map[string]*TopicSchemaConfig), + defaultPartitions: 1, + } +} + +// NewSeaweedMQBrokerHandler creates a new handler with SeaweedMQ broker integration +func NewSeaweedMQBrokerHandler(masters string, filerGroup string, clientHost string) (*Handler, error) { + return NewSeaweedMQBrokerHandlerWithDefaults(masters, filerGroup, clientHost, 4) // Default to 4 partitions +} + +// NewSeaweedMQBrokerHandlerWithDefaults creates a new handler with SeaweedMQ broker integration and custom defaults +func NewSeaweedMQBrokerHandlerWithDefaults(masters string, filerGroup string, clientHost string, defaultPartitions int32) (*Handler, error) { + // Set up SeaweedMQ integration + smqHandler, err := integration.NewSeaweedMQBrokerHandler(masters, filerGroup, clientHost) + if err != nil { + return nil, err + } + + // Use the shared filer client accessor from SeaweedMQHandler + sharedFilerAccessor := smqHandler.GetFilerClientAccessor() + if sharedFilerAccessor == nil { + return nil, fmt.Errorf("no shared filer client accessor available from SMQ handler") + } + + // Create consumer offset storage (for OffsetCommit/OffsetFetch protocol) + // Use filer-based storage for persistence across restarts + consumerOffsetStorage := newOffsetStorageAdapter( + consumer_offset.NewFilerStorage(sharedFilerAccessor), + ) + + // Create response caches to reduce CPU usage + // Metadata cache: 5 second TTL (Schema Registry polls frequently) + // Coordinator cache: 10 second TTL (less frequent, more stable) + metadataCache := NewResponseCache(5 * time.Second) + coordinatorCache := NewResponseCache(10 * time.Second) + + // Start cleanup loops + metadataCache.StartCleanupLoop(30 * time.Second) + coordinatorCache.StartCleanupLoop(60 * time.Second) + + handler := &Handler{ + seaweedMQHandler: smqHandler, + consumerOffsetStorage: consumerOffsetStorage, + groupCoordinator: consumer.NewGroupCoordinator(), + smqBrokerAddresses: nil, // Will be set by SetSMQBrokerAddresses() when server starts + registeredSchemas: make(map[string]bool), + defaultPartitions: defaultPartitions, + metadataCache: metadataCache, + coordinatorCache: coordinatorCache, + } + + // Set protocol handler reference in SMQ handler for connection context access + smqHandler.SetProtocolHandler(handler) + + return handler, nil +} + +// AddTopicForTesting creates a topic for testing purposes +// This delegates to the underlying SeaweedMQ handler +func (h *Handler) AddTopicForTesting(topicName string, partitions int32) { + if h.seaweedMQHandler != nil { + h.seaweedMQHandler.CreateTopic(topicName, partitions) + } +} + +// Delegate methods to SeaweedMQ handler + +// GetOrCreateLedger method REMOVED - SMQ handles Kafka offsets natively + +// GetLedger method REMOVED - SMQ handles Kafka offsets natively + +// Close shuts down the handler and all connections +func (h *Handler) Close() error { + // Close group coordinator + if h.groupCoordinator != nil { + h.groupCoordinator.Close() + } + + // Close broker client if present + if h.brokerClient != nil { + if err := h.brokerClient.Close(); err != nil { + Warning("Failed to close broker client: %v", err) + } + } + + // Close SeaweedMQ handler if present + if h.seaweedMQHandler != nil { + return h.seaweedMQHandler.Close() + } + return nil +} + +// StoreRecordBatch stores a record batch for later retrieval during Fetch operations +func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { + // Record batch storage is now handled by the SeaweedMQ handler +} + +// GetRecordBatch retrieves a stored record batch that contains the requested offset +func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { + // Record batch retrieval is now handled by the SeaweedMQ handler + return nil, false +} + +// SetSMQBrokerAddresses updates the SMQ broker addresses used in Metadata responses +func (h *Handler) SetSMQBrokerAddresses(brokerAddresses []string) { + h.smqBrokerAddresses = brokerAddresses +} + +// GetSMQBrokerAddresses returns the SMQ broker addresses +func (h *Handler) GetSMQBrokerAddresses() []string { + // First try to get from the SeaweedMQ handler (preferred) + if h.seaweedMQHandler != nil { + if brokerAddresses := h.seaweedMQHandler.GetBrokerAddresses(); len(brokerAddresses) > 0 { + return brokerAddresses + } + } + + // Fallback to manually set addresses + if len(h.smqBrokerAddresses) > 0 { + return h.smqBrokerAddresses + } + + // Final fallback for testing + return []string{"localhost:17777"} +} + +// GetGatewayAddress returns the current gateway address as a string (for coordinator registry) +func (h *Handler) GetGatewayAddress() string { + if h.gatewayAddress != "" { + return h.gatewayAddress + } + // Fallback for testing + return "localhost:9092" +} + +// SetGatewayAddress sets the gateway address for coordinator registry +func (h *Handler) SetGatewayAddress(address string) { + h.gatewayAddress = address +} + +// SetCoordinatorRegistry sets the coordinator registry for this handler +func (h *Handler) SetCoordinatorRegistry(registry CoordinatorRegistryInterface) { + h.coordinatorRegistry = registry +} + +// GetCoordinatorRegistry returns the coordinator registry +func (h *Handler) GetCoordinatorRegistry() CoordinatorRegistryInterface { + return h.coordinatorRegistry +} + +// isDataPlaneAPI returns true if the API key is a data plane operation (Fetch, Produce) +// Data plane operations can be slow and may block on I/O +func isDataPlaneAPI(apiKey uint16) bool { + switch APIKey(apiKey) { + case APIKeyProduce: + return true + case APIKeyFetch: + return true + default: + return false + } +} + +// GetConnectionContext returns the current connection context converted to integration.ConnectionContext +// This implements the integration.ProtocolHandler interface +// +// NOTE: Since this method doesn't receive a context parameter, it returns a "best guess" connection context. +// In single-connection scenarios (like tests), this works correctly. In high-concurrency scenarios with many +// simultaneous connections, this may return a connection context from a different connection. +// For a proper fix, the integration.ProtocolHandler interface would need to be updated to pass context.Context. +func (h *Handler) GetConnectionContext() *integration.ConnectionContext { + // Try to find any active connection context + // In most cases (single connection, or low concurrency), this will return the correct context + var connCtx *ConnectionContext + h.connContexts.Range(func(key, value interface{}) bool { + if ctx, ok := value.(*ConnectionContext); ok { + connCtx = ctx + return false // Stop iteration after finding first context + } + return true + }) + + if connCtx == nil { + return nil + } + + // Convert protocol.ConnectionContext to integration.ConnectionContext + return &integration.ConnectionContext{ + ClientID: connCtx.ClientID, + ConsumerGroup: connCtx.ConsumerGroup, + MemberID: connCtx.MemberID, + BrokerClient: connCtx.BrokerClient, + } +} + +// HandleConn processes a single client connection +func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { + connectionID := fmt.Sprintf("%s->%s", conn.RemoteAddr(), conn.LocalAddr()) + + // Record connection metrics + RecordConnectionMetrics() + + // Create cancellable context for this connection + // This ensures all requests are cancelled when the connection closes + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // CRITICAL: Create per-connection BrokerClient for isolated gRPC streams + // This prevents different connections from interfering with each other's Fetch requests + // In mock/unit test mode, this may not be available, so we continue without it + var connBrokerClient *integration.BrokerClient + connBrokerClient, err := h.seaweedMQHandler.CreatePerConnectionBrokerClient() + if err != nil { + // Continue without broker client for unit test/mock mode + connBrokerClient = nil + } + + // RACE CONDITION FIX: Create connection-local context and pass through request pipeline + // Store in thread-safe map to enable lookup from methods that don't have direct access + connContext := &ConnectionContext{ + RemoteAddr: conn.RemoteAddr(), + LocalAddr: conn.LocalAddr(), + ConnectionID: connectionID, + BrokerClient: connBrokerClient, + } + + // Store in thread-safe map for later retrieval + h.connContexts.Store(connectionID, connContext) + + defer func() { + // Close all partition readers first + cleanupPartitionReaders(connContext) + // Close the per-connection broker client + if connBrokerClient != nil { + if closeErr := connBrokerClient.Close(); closeErr != nil { + Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr) + } + } + // Remove connection context from map + h.connContexts.Delete(connectionID) + RecordDisconnectionMetrics() + conn.Close() + }() + + r := bufio.NewReader(conn) + w := bufio.NewWriter(conn) + defer w.Flush() + + // Use default timeout config + timeoutConfig := DefaultTimeoutConfig() + + // Track consecutive read timeouts to detect stale/CLOSE_WAIT connections + consecutiveTimeouts := 0 + const maxConsecutiveTimeouts = 3 // Give up after 3 timeouts in a row + + // CRITICAL: Separate control plane from data plane + // Control plane: Metadata, Heartbeat, JoinGroup, etc. (must be fast, never block) + // Data plane: Fetch, Produce (can be slow, may block on I/O) + // + // Architecture: + // - Main loop routes requests to appropriate channel based on API key + // - Control goroutine processes control messages (fast, sequential) + // - Data goroutine processes data messages (can be slow) + // - Response writer handles responses in order using correlation IDs + controlChan := make(chan *kafkaRequest, 10) + dataChan := make(chan *kafkaRequest, 10) + responseChan := make(chan *kafkaResponse, 100) + var wg sync.WaitGroup + + // Response writer - maintains request/response order per connection + // CRITICAL: While we process requests concurrently (control/data plane), + // we MUST track the order requests arrive and send responses in that same order. + // Solution: Track received correlation IDs in a queue, send responses in that queue order. + correlationQueue := make([]uint32, 0, 100) + correlationQueueMu := &sync.Mutex{} + + wg.Add(1) + go func() { + defer wg.Done() + glog.V(2).Infof("[%s] Response writer started", connectionID) + defer glog.V(2).Infof("[%s] Response writer exiting", connectionID) + pendingResponses := make(map[uint32]*kafkaResponse) + nextToSend := 0 // Index in correlationQueue + + for { + select { + case resp, ok := <-responseChan: + if !ok { + // responseChan closed, exit + return + } + glog.V(2).Infof("[%s] Response writer received correlation=%d from responseChan", connectionID, resp.correlationID) + correlationQueueMu.Lock() + pendingResponses[resp.correlationID] = resp + + // Send all responses we can in queue order + for nextToSend < len(correlationQueue) { + expectedID := correlationQueue[nextToSend] + readyResp, exists := pendingResponses[expectedID] + if !exists { + // Response not ready yet, stop sending + glog.V(3).Infof("[%s] Response writer: waiting for correlation=%d (nextToSend=%d, queueLen=%d)", connectionID, expectedID, nextToSend, len(correlationQueue)) + break + } + + // Send this response + if readyResp.err != nil { + Error("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) + } else { + glog.V(2).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response)) + if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil { + glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) + Error("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr) + correlationQueueMu.Unlock() + return + } + glog.V(2).Infof("[%s] Response writer: successfully wrote correlation=%d", connectionID, readyResp.correlationID) + } + + // Remove from pending and advance + delete(pendingResponses, expectedID) + nextToSend++ + } + correlationQueueMu.Unlock() + case <-ctx.Done(): + // Context cancelled, exit immediately to prevent deadlock + glog.V(2).Infof("[%s] Response writer: context cancelled, exiting", connectionID) + return + } + } + }() + + // Control plane processor - fast operations, never blocks + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case req, ok := <-controlChan: + if !ok { + // Channel closed, exit + return + } + glog.V(2).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) + + // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks + // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer + var response []byte + var err error + func() { + defer func() { + if r := recover(); r != nil { + glog.Errorf("[%s] PANIC in control plane correlation=%d: %v", connectionID, req.correlationID, r) + err = fmt.Errorf("internal server error: panic in request handler: %v", r) + } + }() + response, err = h.processRequestSync(req) + }() + + glog.V(2).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) + select { + case responseChan <- &kafkaResponse{ + correlationID: req.correlationID, + apiKey: req.apiKey, + apiVersion: req.apiVersion, + response: response, + err: err, + }: + glog.V(2).Infof("[%s] Control plane sent correlation=%d to responseChan", connectionID, req.correlationID) + case <-ctx.Done(): + // Connection closed, stop processing + return + case <-time.After(5 * time.Second): + glog.Errorf("[%s] DEADLOCK: Control plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID) + } + case <-ctx.Done(): + // Context cancelled, drain remaining requests before exiting + glog.V(2).Infof("[%s] Control plane: context cancelled, draining remaining requests", connectionID) + for { + select { + case req, ok := <-controlChan: + if !ok { + return + } + // Process remaining requests with a short timeout + glog.V(3).Infof("[%s] Control plane: processing drained request correlation=%d", connectionID, req.correlationID) + response, err := h.processRequestSync(req) + select { + case responseChan <- &kafkaResponse{ + correlationID: req.correlationID, + apiKey: req.apiKey, + apiVersion: req.apiVersion, + response: response, + err: err, + }: + glog.V(3).Infof("[%s] Control plane: sent drained response correlation=%d", connectionID, req.correlationID) + case <-time.After(1 * time.Second): + glog.Warningf("[%s] Control plane: timeout sending drained response correlation=%d, discarding", connectionID, req.correlationID) + return + } + default: + // Channel empty, safe to exit + glog.V(2).Infof("[%s] Control plane: drain complete, exiting", connectionID) + return + } + } + } + } + }() + + // Data plane processor - can block on I/O + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case req, ok := <-dataChan: + if !ok { + // Channel closed, exit + return + } + glog.V(2).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) + + // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks + // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer + var response []byte + var err error + func() { + defer func() { + if r := recover(); r != nil { + glog.Errorf("[%s] PANIC in data plane correlation=%d: %v", connectionID, req.correlationID, r) + err = fmt.Errorf("internal server error: panic in request handler: %v", r) + } + }() + response, err = h.processRequestSync(req) + }() + + glog.V(2).Infof("[%s] Data plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) + // Use select with context to avoid sending on closed channel + select { + case responseChan <- &kafkaResponse{ + correlationID: req.correlationID, + apiKey: req.apiKey, + apiVersion: req.apiVersion, + response: response, + err: err, + }: + glog.V(2).Infof("[%s] Data plane sent correlation=%d to responseChan", connectionID, req.correlationID) + case <-ctx.Done(): + // Connection closed, stop processing + return + case <-time.After(5 * time.Second): + glog.Errorf("[%s] DEADLOCK: Data plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID) + } + case <-ctx.Done(): + // Context cancelled, drain remaining requests before exiting + glog.V(2).Infof("[%s] Data plane: context cancelled, draining remaining requests", connectionID) + for { + select { + case req, ok := <-dataChan: + if !ok { + return + } + // Process remaining requests with a short timeout + glog.V(3).Infof("[%s] Data plane: processing drained request correlation=%d", connectionID, req.correlationID) + response, err := h.processRequestSync(req) + select { + case responseChan <- &kafkaResponse{ + correlationID: req.correlationID, + apiKey: req.apiKey, + apiVersion: req.apiVersion, + response: response, + err: err, + }: + glog.V(3).Infof("[%s] Data plane: sent drained response correlation=%d", connectionID, req.correlationID) + case <-time.After(1 * time.Second): + glog.Warningf("[%s] Data plane: timeout sending drained response correlation=%d, discarding", connectionID, req.correlationID) + return + } + default: + // Channel empty, safe to exit + glog.V(2).Infof("[%s] Data plane: drain complete, exiting", connectionID) + return + } + } + } + } + }() + + defer func() { + // CRITICAL: Close channels in correct order to avoid panics + // 1. Close input channels to stop accepting new requests + close(controlChan) + close(dataChan) + // 2. Wait for worker goroutines to finish processing and sending responses + wg.Wait() + // 3. NOW close responseChan to signal response writer to exit + close(responseChan) + }() + + for { + // Check if context is cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Set a read deadline for the connection based on context or default timeout + var readDeadline time.Time + var timeoutDuration time.Duration + + if deadline, ok := ctx.Deadline(); ok { + readDeadline = deadline + timeoutDuration = time.Until(deadline) + } else { + // Use configurable read timeout instead of hardcoded 5 seconds + timeoutDuration = timeoutConfig.ReadTimeout + readDeadline = time.Now().Add(timeoutDuration) + } + + if err := conn.SetReadDeadline(readDeadline); err != nil { + return fmt.Errorf("set read deadline: %w", err) + } + + // Check context before reading + select { + case <-ctx.Done(): + // Give a small delay to ensure proper cleanup + time.Sleep(100 * time.Millisecond) + return ctx.Err() + default: + // If context is close to being cancelled, set a very short timeout + if deadline, ok := ctx.Deadline(); ok { + timeUntilDeadline := time.Until(deadline) + if timeUntilDeadline < 2*time.Second && timeUntilDeadline > 0 { + shortDeadline := time.Now().Add(500 * time.Millisecond) + if err := conn.SetReadDeadline(shortDeadline); err == nil { + } + } + } + } + + // Read message size (4 bytes) + var sizeBytes [4]byte + if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { + if err == io.EOF { + return nil + } + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + // CRITICAL FIX: Track consecutive timeouts to detect CLOSE_WAIT connections + // When remote peer closes, connection enters CLOSE_WAIT and reads keep timing out + // After several consecutive timeouts with no data, assume connection is dead + consecutiveTimeouts++ + if consecutiveTimeouts >= maxConsecutiveTimeouts { + return nil + } + // Idle timeout while waiting for next request; keep connection open + continue + } + return fmt.Errorf("read message size: %w", err) + } + + // Successfully read data, reset timeout counter + consecutiveTimeouts = 0 + + // Successfully read the message size + size := binary.BigEndian.Uint32(sizeBytes[:]) + // Debug("Read message size: %d bytes", size) + if size == 0 || size > 1024*1024 { // 1MB limit + // Use standardized error for message size limit + // Send error response for message too large + errorResponse := BuildErrorResponse(0, ErrorCodeMessageTooLarge) // correlation ID 0 since we can't parse it yet + if writeErr := h.writeResponseWithCorrelationID(w, 0, errorResponse, timeoutConfig.WriteTimeout); writeErr != nil { + } + return fmt.Errorf("message size %d exceeds limit", size) + } + + // Set read deadline for message body + if err := conn.SetReadDeadline(time.Now().Add(timeoutConfig.ReadTimeout)); err != nil { + } + + // Read the message + messageBuf := make([]byte, size) + if _, err := io.ReadFull(r, messageBuf); err != nil { + _ = HandleTimeoutError(err, "read") // errorCode + return fmt.Errorf("read message: %w", err) + } + + // Parse at least the basic header to get API key and correlation ID + if len(messageBuf) < 8 { + return fmt.Errorf("message too short") + } + + apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) + apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) + correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) + + // Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) + + // Validate API version against what we support + if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { + glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) + // Return proper Kafka error response for unsupported version + response, writeErr := h.buildUnsupportedVersionResponse(correlationID, apiKey, apiVersion) + if writeErr != nil { + return fmt.Errorf("build error response: %w", writeErr) + } + // CRITICAL: Send error response through response queue to maintain sequential ordering + // This prevents deadlocks in the response writer which expects all correlation IDs in sequence + select { + case responseChan <- &kafkaResponse{ + correlationID: correlationID, + apiKey: apiKey, + apiVersion: apiVersion, + response: response, + err: nil, + }: + // Error response queued successfully, continue reading next request + continue + case <-ctx.Done(): + return ctx.Err() + } + } + + // CRITICAL DEBUG: Log that validation passed + glog.V(4).Infof("API VERSION VALIDATION PASSED: Key=%d (%s), Version=%d, Correlation=%d - proceeding to header parsing", + apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) + + // Extract request body - special handling for ApiVersions requests + var requestBody []byte + if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 { + // ApiVersions v3+ uses client_software_name + client_software_version, not client_id + bodyOffset := 8 // Skip api_key(2) + api_version(2) + correlation_id(4) + + // Skip client_software_name (compact string) + if len(messageBuf) > bodyOffset { + clientNameLen := int(messageBuf[bodyOffset]) // compact string length + if clientNameLen > 0 { + clientNameLen-- // compact strings encode length+1 + bodyOffset += 1 + clientNameLen + } else { + bodyOffset += 1 // just the length byte for null/empty + } + } + + // Skip client_software_version (compact string) + if len(messageBuf) > bodyOffset { + clientVersionLen := int(messageBuf[bodyOffset]) // compact string length + if clientVersionLen > 0 { + clientVersionLen-- // compact strings encode length+1 + bodyOffset += 1 + clientVersionLen + } else { + bodyOffset += 1 // just the length byte for null/empty + } + } + + // Skip tagged fields (should be 0x00 for ApiVersions) + if len(messageBuf) > bodyOffset { + bodyOffset += 1 // tagged fields byte + } + + requestBody = messageBuf[bodyOffset:] + } else { + // Parse header using flexible version utilities for other APIs + header, parsedRequestBody, parseErr := ParseRequestHeader(messageBuf) + if parseErr != nil { + // CRITICAL: Log the parsing error for debugging + glog.Errorf("REQUEST HEADER PARSING FAILED: API=%d (%s) v%d, correlation=%d, error=%v, msgLen=%d", + apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr, len(messageBuf)) + + // Fall back to basic header parsing if flexible version parsing fails + + // Basic header parsing fallback (original logic) + bodyOffset := 8 + if len(messageBuf) < bodyOffset+2 { + glog.Errorf("FALLBACK PARSING FAILED: missing client_id length, msgLen=%d", len(messageBuf)) + return fmt.Errorf("invalid header: missing client_id length") + } + clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2])) + bodyOffset += 2 + if clientIDLen >= 0 { + if len(messageBuf) < bodyOffset+int(clientIDLen) { + glog.Errorf("FALLBACK PARSING FAILED: client_id truncated, clientIDLen=%d, msgLen=%d", clientIDLen, len(messageBuf)) + return fmt.Errorf("invalid header: client_id truncated") + } + bodyOffset += int(clientIDLen) + } + requestBody = messageBuf[bodyOffset:] + glog.V(2).Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, len(requestBody)) + } else { + // Use the successfully parsed request body + requestBody = parsedRequestBody + + // Validate parsed header matches what we already extracted + if header.APIKey != apiKey || header.APIVersion != apiVersion || header.CorrelationID != correlationID { + // Fall back to basic parsing rather than failing + bodyOffset := 8 + if len(messageBuf) < bodyOffset+2 { + return fmt.Errorf("invalid header: missing client_id length") + } + clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2])) + bodyOffset += 2 + if clientIDLen >= 0 { + if len(messageBuf) < bodyOffset+int(clientIDLen) { + return fmt.Errorf("invalid header: client_id truncated") + } + bodyOffset += int(clientIDLen) + } + requestBody = messageBuf[bodyOffset:] + } else if header.ClientID != nil { + // Store client ID in connection context for use in fetch requests + connContext.ClientID = *header.ClientID + } + } + } + + // CRITICAL: Route request to appropriate processor + // Control plane: Fast, never blocks (Metadata, Heartbeat, etc.) + // Data plane: Can be slow (Fetch, Produce) + + // Attach connection context to the Go context for retrieval in nested calls + ctxWithConn := context.WithValue(ctx, connContextKey, connContext) + + req := &kafkaRequest{ + correlationID: correlationID, + apiKey: apiKey, + apiVersion: apiVersion, + requestBody: requestBody, + ctx: ctxWithConn, + connContext: connContext, // Pass per-connection context to avoid race conditions + } + + // Route to appropriate channel based on API key + var targetChan chan *kafkaRequest + if isDataPlaneAPI(apiKey) { + targetChan = dataChan + } else { + targetChan = controlChan + } + + // CRITICAL: Only add to correlation queue AFTER successful channel send + // If we add before and the channel blocks, the correlation ID is in the queue + // but the request never gets processed, causing response writer deadlock + select { + case targetChan <- req: + // Request queued successfully - NOW add to correlation tracking + correlationQueueMu.Lock() + correlationQueue = append(correlationQueue, correlationID) + correlationQueueMu.Unlock() + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): + // Channel full for too long - this shouldn't happen with proper backpressure + glog.Errorf("[%s] CRITICAL: Failed to queue correlation=%d after 10s timeout - channel full!", connectionID, correlationID) + return fmt.Errorf("request queue full: correlation=%d", correlationID) + } + } +} + +// processRequestSync processes a single Kafka API request synchronously and returns the response +func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { + // Record request start time for latency tracking + requestStart := time.Now() + apiName := getAPIName(APIKey(req.apiKey)) + + var response []byte + var err error + + switch APIKey(req.apiKey) { + case APIKeyApiVersions: + response, err = h.handleApiVersions(req.correlationID, req.apiVersion) + + case APIKeyMetadata: + response, err = h.handleMetadata(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyListOffsets: + response, err = h.handleListOffsets(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyCreateTopics: + response, err = h.handleCreateTopics(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyDeleteTopics: + response, err = h.handleDeleteTopics(req.correlationID, req.requestBody) + + case APIKeyProduce: + response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyFetch: + response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyJoinGroup: + response, err = h.handleJoinGroup(req.connContext, req.correlationID, req.apiVersion, req.requestBody) + + case APIKeySyncGroup: + response, err = h.handleSyncGroup(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyOffsetCommit: + response, err = h.handleOffsetCommit(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyOffsetFetch: + response, err = h.handleOffsetFetch(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyFindCoordinator: + response, err = h.handleFindCoordinator(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyHeartbeat: + response, err = h.handleHeartbeat(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyLeaveGroup: + response, err = h.handleLeaveGroup(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyDescribeGroups: + response, err = h.handleDescribeGroups(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyListGroups: + response, err = h.handleListGroups(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyDescribeConfigs: + response, err = h.handleDescribeConfigs(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyDescribeCluster: + response, err = h.handleDescribeCluster(req.correlationID, req.apiVersion, req.requestBody) + + case APIKeyInitProducerId: + response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody) + + default: + Warning("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID) + err = fmt.Errorf("unsupported API key: %d (version %d)", req.apiKey, req.apiVersion) + } + + glog.V(2).Infof("processRequestSync: Switch completed for correlation=%d, about to record metrics", req.correlationID) + // Record metrics + requestLatency := time.Since(requestStart) + if err != nil { + RecordErrorMetrics(req.apiKey, requestLatency) + } else { + RecordRequestMetrics(req.apiKey, requestLatency) + } + glog.V(2).Infof("processRequestSync: Metrics recorded for correlation=%d, about to return", req.correlationID) + + return response, err +} + +// ApiKeyInfo represents supported API key information +type ApiKeyInfo struct { + ApiKey APIKey + MinVersion uint16 + MaxVersion uint16 +} + +// SupportedApiKeys defines all supported API keys and their version ranges +var SupportedApiKeys = []ApiKeyInfo{ + {APIKeyApiVersions, 0, 4}, // ApiVersions - support up to v4 for Kafka 8.0.0 compatibility + {APIKeyMetadata, 0, 7}, // Metadata - support up to v7 + {APIKeyProduce, 0, 7}, // Produce + {APIKeyFetch, 0, 7}, // Fetch + {APIKeyListOffsets, 0, 2}, // ListOffsets + {APIKeyCreateTopics, 0, 5}, // CreateTopics + {APIKeyDeleteTopics, 0, 4}, // DeleteTopics + {APIKeyFindCoordinator, 0, 3}, // FindCoordinator - v3+ supports flexible responses + {APIKeyJoinGroup, 0, 6}, // JoinGroup + {APIKeySyncGroup, 0, 5}, // SyncGroup + {APIKeyOffsetCommit, 0, 2}, // OffsetCommit + {APIKeyOffsetFetch, 0, 5}, // OffsetFetch + {APIKeyHeartbeat, 0, 4}, // Heartbeat + {APIKeyLeaveGroup, 0, 4}, // LeaveGroup + {APIKeyDescribeGroups, 0, 5}, // DescribeGroups + {APIKeyListGroups, 0, 4}, // ListGroups + {APIKeyDescribeConfigs, 0, 4}, // DescribeConfigs + {APIKeyInitProducerId, 0, 4}, // InitProducerId - support up to v4 for transactional producers + {APIKeyDescribeCluster, 0, 1}, // DescribeCluster - for AdminClient compatibility (KIP-919) +} + +func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([]byte, error) { + // Send correct flexible or non-flexible response based on API version + // This fixes the AdminClient "collection size 2184558" error by using proper varint encoding + response := make([]byte, 0, 512) + + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // === RESPONSE BODY === + // Error code (2 bytes) - always fixed-length + response = append(response, 0, 0) // No error + + // API Keys Array - CRITICAL FIX: Use correct encoding based on version + if apiVersion >= 3 { + // FLEXIBLE FORMAT: Compact array with varint length - THIS FIXES THE ADMINCLIENT BUG! + response = append(response, CompactArrayLength(uint32(len(SupportedApiKeys)))...) + + // Add API key entries with per-element tagged fields + for _, api := range SupportedApiKeys { + response = append(response, byte(api.ApiKey>>8), byte(api.ApiKey)) // api_key (2 bytes) + response = append(response, byte(api.MinVersion>>8), byte(api.MinVersion)) // min_version (2 bytes) + response = append(response, byte(api.MaxVersion>>8), byte(api.MaxVersion)) // max_version (2 bytes) + response = append(response, 0x00) // Per-element tagged fields (varint: empty) + } + + } else { + // NON-FLEXIBLE FORMAT: Regular array with fixed 4-byte length + response = append(response, 0, 0, 0, byte(len(SupportedApiKeys))) // Array length (4 bytes) + + // Add API key entries without tagged fields + for _, api := range SupportedApiKeys { + response = append(response, byte(api.ApiKey>>8), byte(api.ApiKey)) // api_key (2 bytes) + response = append(response, byte(api.MinVersion>>8), byte(api.MinVersion)) // min_version (2 bytes) + response = append(response, byte(api.MaxVersion>>8), byte(api.MaxVersion)) // max_version (2 bytes) + } + } + + // Throttle time (for v1+) - always fixed-length + if apiVersion >= 1 { + response = append(response, 0, 0, 0, 0) // throttle_time_ms = 0 (4 bytes) + } + + // Response-level tagged fields (for v3+ flexible versions) + if apiVersion >= 3 { + response = append(response, 0x00) // Empty response-level tagged fields (varint: single byte 0) + } + + return response, nil +} + +// handleMetadataV0 implements the Metadata API response in version 0 format. +// v0 response layout: +// correlation_id(4) + brokers(ARRAY) + topics(ARRAY) +// broker: node_id(4) + host(STRING) + port(4) +// topic: error_code(2) + name(STRING) + partitions(ARRAY) +// partition: error_code(2) + partition_id(4) + leader(4) + replicas(ARRAY<int32>) + isr(ARRAY<int32>) +func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]byte, error) { + response := make([]byte, 0, 256) + + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // Brokers array length (4 bytes) - 1 broker (this gateway) + response = append(response, 0, 0, 0, 1) + + // Broker 0: node_id(4) + host(STRING) + port(4) + response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions) + + // Get advertised address for client connections + host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) + + // Host (STRING: 2 bytes length + bytes) - validate length fits in uint16 + if len(host) > 65535 { + return nil, fmt.Errorf("host name too long: %d bytes", len(host)) + } + hostLen := uint16(len(host)) + response = append(response, byte(hostLen>>8), byte(hostLen)) + response = append(response, []byte(host)...) + + // Port (4 bytes) - validate port range + if port < 0 || port > 65535 { + return nil, fmt.Errorf("invalid port number: %d", port) + } + portBytes := make([]byte, 4) + binary.BigEndian.PutUint32(portBytes, uint32(port)) + response = append(response, portBytes...) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + glog.V(0).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) + + // Determine topics to return using SeaweedMQ handler + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = h.seaweedMQHandler.ListTopics() + } else { + for _, name := range requestedTopics { + if h.seaweedMQHandler.TopicExists(name) { + topicsToReturn = append(topicsToReturn, name) + } + } + } + + // Topics array length (4 bytes) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) + response = append(response, topicsCountBytes...) + + // Topic entries + for _, topicName := range topicsToReturn { + // error_code(2) = 0 + response = append(response, 0, 0) + + // name (STRING) + nameBytes := []byte(topicName) + nameLen := uint16(len(nameBytes)) + response = append(response, byte(nameLen>>8), byte(nameLen)) + response = append(response, nameBytes...) + + // Get actual partition count from topic info + topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topicName) + partitionCount := h.GetDefaultPartitions() // Use configurable default + if exists && topicInfo != nil { + partitionCount = topicInfo.Partitions + } + + // partitions array length (4 bytes) + partitionsBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsBytes, uint32(partitionCount)) + response = append(response, partitionsBytes...) + + // Create partition entries for each partition + for partitionID := int32(0); partitionID < partitionCount; partitionID++ { + // partition: error_code(2) + partition_id(4) + leader(4) + response = append(response, 0, 0) // error_code + + // partition_id (4 bytes) + partitionIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID)) + response = append(response, partitionIDBytes...) + + response = append(response, 0, 0, 0, 1) // leader = 1 (this broker) + + // replicas: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 1) + + // isr: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 1) + } + } + + for range topicsToReturn { + } + return response, nil +} + +func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]byte, error) { + // Simplified Metadata v1 implementation - based on working v0 + v1 additions + // v1 adds: ControllerID (after brokers), Rack (for brokers), IsInternal (for topics) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + glog.V(0).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) + + // Determine topics to return using SeaweedMQ handler + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = h.seaweedMQHandler.ListTopics() + } else { + for _, name := range requestedTopics { + if h.seaweedMQHandler.TopicExists(name) { + topicsToReturn = append(topicsToReturn, name) + } + } + } + + // Build response using same approach as v0 but with v1 additions + response := make([]byte, 0, 256) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + + // Brokers array length (4 bytes) - 1 broker (this gateway) + response = append(response, 0, 0, 0, 1) + + // Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) + response = append(response, 0, 0, 0, 1) // node_id = 1 + + // Get advertised address for client connections + host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) + + // Host (STRING: 2 bytes length + bytes) - validate length fits in uint16 + if len(host) > 65535 { + return nil, fmt.Errorf("host name too long: %d bytes", len(host)) + } + hostLen := uint16(len(host)) + response = append(response, byte(hostLen>>8), byte(hostLen)) + response = append(response, []byte(host)...) + + // Port (4 bytes) - validate port range + if port < 0 || port > 65535 { + return nil, fmt.Errorf("invalid port number: %d", port) + } + portBytes := make([]byte, 4) + binary.BigEndian.PutUint32(portBytes, uint32(port)) + response = append(response, portBytes...) + + // Rack (STRING: 2 bytes length + bytes) - v1 addition, non-nullable empty string + response = append(response, 0, 0) // empty string + + // ControllerID (4 bytes) - v1 addition + response = append(response, 0, 0, 0, 1) // controller_id = 1 + + // Topics array length (4 bytes) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, uint32(len(topicsToReturn))) + response = append(response, topicsCountBytes...) + + // Topics + for _, topicName := range topicsToReturn { + // error_code (2 bytes) + response = append(response, 0, 0) + + // topic name (STRING: 2 bytes length + bytes) + topicLen := uint16(len(topicName)) + response = append(response, byte(topicLen>>8), byte(topicLen)) + response = append(response, []byte(topicName)...) + + // is_internal (1 byte) - v1 addition + response = append(response, 0) // false + + // Get actual partition count from topic info + topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topicName) + partitionCount := h.GetDefaultPartitions() // Use configurable default + if exists && topicInfo != nil { + partitionCount = topicInfo.Partitions + } + + // partitions array length (4 bytes) + partitionsBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsBytes, uint32(partitionCount)) + response = append(response, partitionsBytes...) + + // Create partition entries for each partition + for partitionID := int32(0); partitionID < partitionCount; partitionID++ { + // partition: error_code(2) + partition_id(4) + leader_id(4) + replicas(ARRAY) + isr(ARRAY) + response = append(response, 0, 0) // error_code + + // partition_id (4 bytes) + partitionIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID)) + response = append(response, partitionIDBytes...) + + response = append(response, 0, 0, 0, 1) // leader_id = 1 + + // replicas: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 1) + + // isr: array length(4) + one broker id (1) + response = append(response, 0, 0, 0, 1) + response = append(response, 0, 0, 0, 1) + } + } + + return response, nil +} + +// HandleMetadataV2 implements Metadata API v2 with ClusterID field +func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]byte, error) { + // Metadata v2 adds ClusterID field (nullable string) + // v2 response layout: correlation_id(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + glog.V(0).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) + + // Determine topics to return using SeaweedMQ handler + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = h.seaweedMQHandler.ListTopics() + } else { + for _, name := range requestedTopics { + if h.seaweedMQHandler.TopicExists(name) { + topicsToReturn = append(topicsToReturn, name) + } + } + } + + var buf bytes.Buffer + + // Correlation ID (4 bytes) + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // Brokers array (4 bytes length + brokers) - 1 broker (this gateway) + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Get advertised address for client connections + host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) + + nodeID := int32(1) // Single gateway node + + // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) + binary.Write(&buf, binary.BigEndian, nodeID) + + // Host (STRING: 2 bytes length + data) - validate length fits in int16 + if len(host) > 32767 { + return nil, fmt.Errorf("host name too long: %d bytes", len(host)) + } + binary.Write(&buf, binary.BigEndian, int16(len(host))) + buf.WriteString(host) + + // Port (4 bytes) - validate port range + if port < 0 || port > 65535 { + return nil, fmt.Errorf("invalid port number: %d", port) + } + binary.Write(&buf, binary.BigEndian, int32(port)) + + // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable + binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + + // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2 addition + // Schema Registry requires a non-null cluster ID + clusterID := "seaweedfs-kafka-gateway" + binary.Write(&buf, binary.BigEndian, int16(len(clusterID))) + buf.WriteString(clusterID) + + // ControllerID (4 bytes) - v1+ addition + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Topics array (4 bytes length + topics) + binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + + for _, topicName := range topicsToReturn { + // ErrorCode (2 bytes) + binary.Write(&buf, binary.BigEndian, int16(0)) + + // Name (STRING: 2 bytes length + data) + binary.Write(&buf, binary.BigEndian, int16(len(topicName))) + buf.WriteString(topicName) + + // IsInternal (1 byte) - v1+ addition + buf.WriteByte(0) // false + + // Get actual partition count from topic info + topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topicName) + partitionCount := h.GetDefaultPartitions() // Use configurable default + if exists && topicInfo != nil { + partitionCount = topicInfo.Partitions + } + + // Partitions array (4 bytes length + partitions) + binary.Write(&buf, binary.BigEndian, partitionCount) + + // Create partition entries for each partition + for partitionID := int32(0); partitionID < partitionCount; partitionID++ { + binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode + binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex + binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + + // ReplicaNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // IsrNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + } + } + + response := buf.Bytes() + + return response, nil +} + +// HandleMetadataV3V4 implements Metadata API v3/v4 with ThrottleTimeMs field +func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ([]byte, error) { + // Metadata v3/v4 adds ThrottleTimeMs field at the beginning + // v3/v4 response layout: correlation_id(4) + throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + glog.V(0).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) + + // Determine topics to return using SeaweedMQ handler + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = h.seaweedMQHandler.ListTopics() + } else { + for _, name := range requestedTopics { + if h.seaweedMQHandler.TopicExists(name) { + topicsToReturn = append(topicsToReturn, name) + } + } + } + + var buf bytes.Buffer + + // Correlation ID (4 bytes) + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // ThrottleTimeMs (4 bytes) - v3+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling + + // Brokers array (4 bytes length + brokers) - 1 broker (this gateway) + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Get advertised address for client connections + host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) + + nodeID := int32(1) // Single gateway node + + // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) + binary.Write(&buf, binary.BigEndian, nodeID) + + // Host (STRING: 2 bytes length + data) - validate length fits in int16 + if len(host) > 32767 { + return nil, fmt.Errorf("host name too long: %d bytes", len(host)) + } + binary.Write(&buf, binary.BigEndian, int16(len(host))) + buf.WriteString(host) + + // Port (4 bytes) - validate port range + if port < 0 || port > 65535 { + return nil, fmt.Errorf("invalid port number: %d", port) + } + binary.Write(&buf, binary.BigEndian, int32(port)) + + // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable + binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + + // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition + // Schema Registry requires a non-null cluster ID + clusterID := "seaweedfs-kafka-gateway" + binary.Write(&buf, binary.BigEndian, int16(len(clusterID))) + buf.WriteString(clusterID) + + // ControllerID (4 bytes) - v1+ addition + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Topics array (4 bytes length + topics) + binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + + for _, topicName := range topicsToReturn { + // ErrorCode (2 bytes) + binary.Write(&buf, binary.BigEndian, int16(0)) + + // Name (STRING: 2 bytes length + data) + binary.Write(&buf, binary.BigEndian, int16(len(topicName))) + buf.WriteString(topicName) + + // IsInternal (1 byte) - v1+ addition + buf.WriteByte(0) // false + + // Get actual partition count from topic info + topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topicName) + partitionCount := h.GetDefaultPartitions() // Use configurable default + if exists && topicInfo != nil { + partitionCount = topicInfo.Partitions + } + + // Partitions array (4 bytes length + partitions) + binary.Write(&buf, binary.BigEndian, partitionCount) + + // Create partition entries for each partition + for partitionID := int32(0); partitionID < partitionCount; partitionID++ { + binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode + binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex + binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + + // ReplicaNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // IsrNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + } + } + + response := buf.Bytes() + + return response, nil +} + +// HandleMetadataV5V6 implements Metadata API v5/v6 with OfflineReplicas field +func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ([]byte, error) { + return h.handleMetadataV5ToV8(correlationID, requestBody, 5) +} + +// HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field (REGULAR FORMAT, NOT FLEXIBLE) +func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) { + // CRITICAL: Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format + // Only v9+ uses compact format (flexible responses) + return h.handleMetadataV5ToV8(correlationID, requestBody, 7) +} + +// handleMetadataV5ToV8 handles Metadata v5-v8 with regular (non-compact) encoding +// v5/v6: adds OfflineReplicas field to partitions +// v7: adds LeaderEpoch field to partitions +// v8: adds ClusterAuthorizedOperations field +// All use REGULAR arrays/strings (NOT compact) - only v9+ uses compact format +func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, apiVersion int) ([]byte, error) { + // v5-v8 response layout: throttle_time_ms(4) + brokers(ARRAY) + cluster_id(NULLABLE_STRING) + controller_id(4) + topics(ARRAY) [+ cluster_authorized_operations(4) for v8] + // Each partition includes: error_code(2) + partition_index(4) + leader_id(4) [+ leader_epoch(4) for v7+] + replica_nodes(ARRAY) + isr_nodes(ARRAY) + offline_replicas(ARRAY) + + // Parse requested topics (empty means all) + requestedTopics := h.parseMetadataTopics(requestBody) + glog.V(0).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) + + // Determine topics to return using SeaweedMQ handler + var topicsToReturn []string + if len(requestedTopics) == 0 { + topicsToReturn = h.seaweedMQHandler.ListTopics() + } else { + // FIXED: Proper topic existence checking (removed the hack) + // Now that CreateTopics v5 works, we use proper Kafka workflow: + // 1. Check which requested topics actually exist + // 2. Auto-create system topics if they don't exist + // 3. Only return existing topics in metadata + // 4. Client will call CreateTopics for non-existent topics + // 5. Then request metadata again to see the created topics + for _, topic := range requestedTopics { + if isSystemTopic(topic) { + // Always try to auto-create system topics during metadata requests + glog.V(0).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) + if !h.seaweedMQHandler.TopicExists(topic) { + glog.V(0).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) + if err := h.createTopicWithSchemaSupport(topic, 1); err != nil { + glog.V(0).Infof("[METADATA v%d] Failed to auto-create system topic %s: %v", apiVersion, topic, err) + // Continue without adding to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION + } else { + glog.V(0).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) + } + } else { + glog.V(0).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) + } + topicsToReturn = append(topicsToReturn, topic) + } else if h.seaweedMQHandler.TopicExists(topic) { + topicsToReturn = append(topicsToReturn, topic) + } + } + glog.V(0).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) + } + + var buf bytes.Buffer + + // Correlation ID (4 bytes) + // NOTE: Correlation ID is handled by writeResponseWithCorrelationID + // Do NOT include it in the response body + + // ThrottleTimeMs (4 bytes) - v3+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling + + // Brokers array (4 bytes length + brokers) - 1 broker (this gateway) + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Get advertised address for client connections + host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) + + nodeID := int32(1) // Single gateway node + + // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) + binary.Write(&buf, binary.BigEndian, nodeID) + + // Host (STRING: 2 bytes length + data) - validate length fits in int16 + if len(host) > 32767 { + return nil, fmt.Errorf("host name too long: %d bytes", len(host)) + } + binary.Write(&buf, binary.BigEndian, int16(len(host))) + buf.WriteString(host) + + // Port (4 bytes) - validate port range + if port < 0 || port > 65535 { + return nil, fmt.Errorf("invalid port number: %d", port) + } + binary.Write(&buf, binary.BigEndian, int32(port)) + + // Rack (STRING: 2 bytes length + data) - v1+ addition, non-nullable + binary.Write(&buf, binary.BigEndian, int16(0)) // Empty string + + // ClusterID (NULLABLE_STRING: 2 bytes length + data) - v2+ addition + // Schema Registry requires a non-null cluster ID + clusterID := "seaweedfs-kafka-gateway" + binary.Write(&buf, binary.BigEndian, int16(len(clusterID))) + buf.WriteString(clusterID) + + // ControllerID (4 bytes) - v1+ addition + binary.Write(&buf, binary.BigEndian, int32(1)) + + // Topics array (4 bytes length + topics) + binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) + + for _, topicName := range topicsToReturn { + // ErrorCode (2 bytes) + binary.Write(&buf, binary.BigEndian, int16(0)) + + // Name (STRING: 2 bytes length + data) + binary.Write(&buf, binary.BigEndian, int16(len(topicName))) + buf.WriteString(topicName) + + // IsInternal (1 byte) - v1+ addition + buf.WriteByte(0) // false + + // Get actual partition count from topic info + topicInfo, exists := h.seaweedMQHandler.GetTopicInfo(topicName) + partitionCount := h.GetDefaultPartitions() // Use configurable default + if exists && topicInfo != nil { + partitionCount = topicInfo.Partitions + } + + // Partitions array (4 bytes length + partitions) + binary.Write(&buf, binary.BigEndian, partitionCount) + + // Create partition entries for each partition + for partitionID := int32(0); partitionID < partitionCount; partitionID++ { + binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode + binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex + binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + + // LeaderEpoch (4 bytes) - v7+ addition + if apiVersion >= 7 { + binary.Write(&buf, binary.BigEndian, int32(0)) // Leader epoch 0 + } + + // ReplicaNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // IsrNodes array (4 bytes length + nodes) + binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node + binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + + // OfflineReplicas array (4 bytes length + nodes) - v5+ addition + binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas + } + } + + // ClusterAuthorizedOperations (4 bytes) - v8+ addition + if apiVersion >= 8 { + binary.Write(&buf, binary.BigEndian, int32(-2147483648)) // All operations allowed (bit mask) + } + + response := buf.Bytes() + + return response, nil +} + +func (h *Handler) parseMetadataTopics(requestBody []byte) []string { + // Support both v0/v1 parsing: v1 payload starts directly with topics array length (int32), + // while older assumptions may have included a client_id string first. + if len(requestBody) < 4 { + return []string{} + } + + // Try path A: interpret first 4 bytes as topics_count + offset := 0 + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + if topicsCount == 0xFFFFFFFF { // -1 means all topics + return []string{} + } + if topicsCount <= 1000000 { // sane bound + offset += 4 + topics := make([]string, 0, topicsCount) + for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ { + nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + if offset+nameLen > len(requestBody) { + break + } + topics = append(topics, string(requestBody[offset:offset+nameLen])) + offset += nameLen + } + return topics + } + + // Path B: assume leading client_id string then topics_count + if len(requestBody) < 6 { + return []string{} + } + clientIDLen := int(binary.BigEndian.Uint16(requestBody[0:2])) + offset = 2 + clientIDLen + if len(requestBody) < offset+4 { + return []string{} + } + topicsCount = binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + if topicsCount == 0xFFFFFFFF { + return []string{} + } + topics := make([]string, 0, topicsCount) + for i := uint32(0); i < topicsCount && offset+2 <= len(requestBody); i++ { + nameLen := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + if offset+nameLen > len(requestBody) { + break + } + topics = append(topics, string(requestBody[offset:offset+nameLen])) + offset += nameLen + } + return topics +} + +func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + + // Parse minimal request to understand what's being asked (header already stripped) + offset := 0 + + // v1+ has replica_id(4) + if apiVersion >= 1 { + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("ListOffsets v%d request missing replica_id", apiVersion) + } + _ = int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) // replicaID + offset += 4 + } + + // v2+ adds isolation_level(1) + if apiVersion >= 2 { + if len(requestBody) < offset+1 { + return nil, fmt.Errorf("ListOffsets v%d request missing isolation_level", apiVersion) + } + _ = requestBody[offset] // isolationLevel + offset += 1 + } + + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("ListOffsets request missing topics count") + } + + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + response := make([]byte, 0, 256) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + + // Throttle time (4 bytes, 0 = no throttling) - v2+ only + if apiVersion >= 2 { + response = append(response, 0, 0, 0, 0) + } + + // Topics count (will be updated later with actual count) + topicsCountBytes := make([]byte, 4) + topicsCountOffset := len(response) // Remember where to update the count + binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) + response = append(response, topicsCountBytes...) + + // Track how many topics we actually process + actualTopicsCount := uint32(0) + + // Process each requested topic + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + if len(requestBody) < offset+2 { + break + } + + // Parse topic name + topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameSize)+4 { + break + } + + topicName := requestBody[offset : offset+int(topicNameSize)] + offset += int(topicNameSize) + + // Parse partitions count for this topic + partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // Response: topic_name_size(2) + topic_name + partitions_array + response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) + response = append(response, topicName...) + + partitionsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount) + response = append(response, partitionsCountBytes...) + + // Process each partition + for j := uint32(0); j < partitionsCount && offset+12 <= len(requestBody); j++ { + // Parse partition request: partition_id(4) + timestamp(8) + partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + timestamp := int64(binary.BigEndian.Uint64(requestBody[offset+4 : offset+12])) + offset += 12 + + // Response: partition_id(4) + error_code(2) + timestamp(8) + offset(8) + partitionIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partitionIDBytes, partitionID) + response = append(response, partitionIDBytes...) + + // Error code (0 = no error) + response = append(response, 0, 0) + + // Use direct SMQ reading - no ledgers needed + // SMQ handles offset management internally + var responseTimestamp int64 + var responseOffset int64 + + switch timestamp { + case -2: // earliest offset + // Get the actual earliest offset from SMQ + earliestOffset, err := h.seaweedMQHandler.GetEarliestOffset(string(topicName), int32(partitionID)) + if err != nil { + responseOffset = 0 // fallback to 0 + } else { + responseOffset = earliestOffset + } + responseTimestamp = 0 // No specific timestamp for earliest + if strings.HasPrefix(string(topicName), "_schemas") { + glog.Infof("SCHEMA REGISTRY LISTOFFSETS EARLIEST: topic=%s partition=%d returning offset=%d", string(topicName), partitionID, responseOffset) + } + case -1: // latest offset + // Get the actual latest offset from SMQ + if h.seaweedMQHandler == nil { + responseOffset = 0 + } else { + latestOffset, err := h.seaweedMQHandler.GetLatestOffset(string(topicName), int32(partitionID)) + if err != nil { + responseOffset = 0 // fallback to 0 + } else { + responseOffset = latestOffset + } + } + responseTimestamp = 0 // No specific timestamp for latest + default: // specific timestamp - find offset by timestamp + // For timestamp-based lookup, we need to implement this properly + // For now, return 0 as fallback + responseOffset = 0 + responseTimestamp = timestamp + } + + // Ensure we never return a timestamp as offset - this was the bug! + if responseOffset > 1000000000 { // If offset looks like a timestamp + responseOffset = 0 + } + + timestampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(timestampBytes, uint64(responseTimestamp)) + response = append(response, timestampBytes...) + + offsetBytes := make([]byte, 8) + binary.BigEndian.PutUint64(offsetBytes, uint64(responseOffset)) + response = append(response, offsetBytes...) + } + + // Successfully processed this topic + actualTopicsCount++ + } + + // CRITICAL FIX: Update the topics count in the response header with the actual count + // This prevents ErrIncompleteResponse when request parsing fails mid-way + if actualTopicsCount != topicsCount { + binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount) + } + + return response, nil +} + +func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + + if len(requestBody) < 2 { + return nil, fmt.Errorf("CreateTopics request too short") + } + + // Parse based on API version + switch apiVersion { + case 0, 1: + response, err := h.handleCreateTopicsV0V1(correlationID, requestBody) + return response, err + case 2, 3, 4: + // kafka-go sends v2-4 in regular format, not compact + response, err := h.handleCreateTopicsV2To4(correlationID, requestBody) + return response, err + case 5: + // v5+ uses flexible format with compact arrays + response, err := h.handleCreateTopicsV2Plus(correlationID, apiVersion, requestBody) + return response, err + default: + return nil, fmt.Errorf("unsupported CreateTopics API version: %d", apiVersion) + } +} + +// handleCreateTopicsV2To4 handles CreateTopics API versions 2-4 (auto-detect regular vs compact format) +func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []byte) ([]byte, error) { + // Auto-detect format: kafka-go sends regular format, tests send compact format + if len(requestBody) < 1 { + return nil, fmt.Errorf("CreateTopics v2-4 request too short") + } + + // Detect format by checking first byte + // Compact format: first byte is compact array length (usually 0x02 for 1 topic) + // Regular format: first 4 bytes are regular array count (usually 0x00000001 for 1 topic) + isCompactFormat := false + if len(requestBody) >= 4 { + // Check if this looks like a regular 4-byte array count + regularCount := binary.BigEndian.Uint32(requestBody[0:4]) + // If the "regular count" is very large (> 1000), it's probably compact format + // Also check if first byte is small (typical compact array length) + if regularCount > 1000 || (requestBody[0] <= 10 && requestBody[0] > 0) { + isCompactFormat = true + } + } else if requestBody[0] <= 10 && requestBody[0] > 0 { + isCompactFormat = true + } + + if isCompactFormat { + // Delegate to the compact format handler + response, err := h.handleCreateTopicsV2Plus(correlationID, 2, requestBody) + return response, err + } + + // Handle regular format + offset := 0 + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4 request too short for topics array") + } + + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // Parse topics + topics := make([]struct { + name string + partitions uint32 + replication uint16 + }, 0, topicsCount) + for i := uint32(0); i < topicsCount; i++ { + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated topic name length") + } + nameLen := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + if len(requestBody) < offset+int(nameLen) { + return nil, fmt.Errorf("CreateTopics v2-4: truncated topic name") + } + topicName := string(requestBody[offset : offset+int(nameLen)]) + offset += int(nameLen) + + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated num_partitions") + } + numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated replication_factor") + } + replication := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + // Assignments array (array of partition assignments) - skip contents + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated assignments count") + } + assignments := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + for j := uint32(0); j < assignments; j++ { + // partition_id (int32) + replicas (array int32) + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated assignment partition id") + } + offset += 4 + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated replicas count") + } + replicasCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + // skip replica ids + offset += int(replicasCount) * 4 + } + + // Configs array (array of (name,value) strings) - skip contents + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated configs count") + } + configs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + for j := uint32(0); j < configs; j++ { + // name (string) + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated config name length") + } + nameLen := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(nameLen) + // value (nullable string) + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("CreateTopics v2-4: truncated config value length") + } + valueLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + if valueLen >= 0 { + offset += int(valueLen) + } + } + + topics = append(topics, struct { + name string + partitions uint32 + replication uint16 + }{topicName, numPartitions, replication}) + } + + // timeout_ms + if len(requestBody) >= offset+4 { + _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + } + // validate_only (boolean) + if len(requestBody) >= offset+1 { + _ = requestBody[offset] + offset += 1 + } + + // Build response + response := make([]byte, 0, 128) + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + // throttle_time_ms (4 bytes) + response = append(response, 0, 0, 0, 0) + // topics array count (int32) + countBytes := make([]byte, 4) + binary.BigEndian.PutUint32(countBytes, uint32(len(topics))) + response = append(response, countBytes...) + // per-topic responses + for _, t := range topics { + // topic name (string) + nameLen := make([]byte, 2) + binary.BigEndian.PutUint16(nameLen, uint16(len(t.name))) + response = append(response, nameLen...) + response = append(response, []byte(t.name)...) + // error_code (int16) + var errCode uint16 = 0 + if h.seaweedMQHandler.TopicExists(t.name) { + errCode = 36 // TOPIC_ALREADY_EXISTS + } else if t.partitions == 0 { + errCode = 37 // INVALID_PARTITIONS + } else if t.replication == 0 { + errCode = 38 // INVALID_REPLICATION_FACTOR + } else { + // Use schema-aware topic creation + if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil { + errCode = 1 // UNKNOWN_SERVER_ERROR + } + } + eb := make([]byte, 2) + binary.BigEndian.PutUint16(eb, errCode) + response = append(response, eb...) + // error_message (nullable string) -> null + response = append(response, 0xFF, 0xFF) + } + + return response, nil +} + +func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byte) ([]byte, error) { + + if len(requestBody) < 4 { + return nil, fmt.Errorf("CreateTopics v0/v1 request too short") + } + + offset := 0 + + // Parse topics array (regular array format: count + topics) + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // Build response + response := make([]byte, 0, 256) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + + // Topics array count (4 bytes in v0/v1) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) + response = append(response, topicsCountBytes...) + + // Process each topic + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + // Parse topic name (regular string: length + bytes) + if len(requestBody) < offset+2 { + break + } + topicNameLength := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameLength) { + break + } + topicName := string(requestBody[offset : offset+int(topicNameLength)]) + offset += int(topicNameLength) + + // Parse num_partitions (4 bytes) + if len(requestBody) < offset+4 { + break + } + numPartitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // Parse replication_factor (2 bytes) + if len(requestBody) < offset+2 { + break + } + replicationFactor := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + // Parse assignments array (4 bytes count, then assignments) + if len(requestBody) < offset+4 { + break + } + assignmentsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // Skip assignments for now (simplified) + for j := uint32(0); j < assignmentsCount && offset < len(requestBody); j++ { + // Skip partition_id (4 bytes) + if len(requestBody) >= offset+4 { + offset += 4 + } + // Skip replicas array (4 bytes count + replica_ids) + if len(requestBody) >= offset+4 { + replicasCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + offset += int(replicasCount) * 4 // Skip replica IDs + } + } + + // Parse configs array (4 bytes count, then configs) + if len(requestBody) >= offset+4 { + configsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // Skip configs (simplified) + for j := uint32(0); j < configsCount && offset < len(requestBody); j++ { + // Skip config name (string: 2 bytes length + bytes) + if len(requestBody) >= offset+2 { + configNameLength := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(configNameLength) + } + // Skip config value (string: 2 bytes length + bytes) + if len(requestBody) >= offset+2 { + configValueLength := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + int(configValueLength) + } + } + } + + // Build response for this topic + // Topic name (string: length + bytes) + topicNameLengthBytes := make([]byte, 2) + binary.BigEndian.PutUint16(topicNameLengthBytes, uint16(len(topicName))) + response = append(response, topicNameLengthBytes...) + response = append(response, []byte(topicName)...) + + // Determine error code and message + var errorCode uint16 = 0 + + // Apply defaults for invalid values + if numPartitions <= 0 { + numPartitions = uint32(h.GetDefaultPartitions()) // Use configurable default + } + if replicationFactor <= 0 { + replicationFactor = 1 // Default to 1 replica + } + + // Use SeaweedMQ integration + if h.seaweedMQHandler.TopicExists(topicName) { + errorCode = 36 // TOPIC_ALREADY_EXISTS + } else { + // Create the topic in SeaweedMQ with schema support + if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil { + errorCode = 1 // UNKNOWN_SERVER_ERROR + } + } + + // Error code (2 bytes) + errorCodeBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorCodeBytes, errorCode) + response = append(response, errorCodeBytes...) + } + + // Parse timeout_ms (4 bytes) - at the end of request + if len(requestBody) >= offset+4 { + _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) // timeoutMs + offset += 4 + } + + // Parse validate_only (1 byte) - only in v1 + if len(requestBody) >= offset+1 { + _ = requestBody[offset] != 0 // validateOnly + } + + return response, nil +} + +// handleCreateTopicsV2Plus handles CreateTopics API versions 2+ (flexible versions with compact arrays/strings) +// For simplicity and consistency with existing response builder, this parses the flexible request, +// converts it into the non-flexible v2-v4 body format, and reuses handleCreateTopicsV2To4 to build the response. +func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + offset := 0 + + // ADMIN CLIENT COMPATIBILITY FIX: + // AdminClient's CreateTopics v5 request DOES start with top-level tagged fields (usually empty) + // Parse them first, then the topics compact array + + // Parse top-level tagged fields first (usually 0x00 for empty) + _, consumed, err := DecodeTaggedFields(requestBody[offset:]) + if err != nil { + // Don't fail - AdminClient might not always include tagged fields properly + // Just log and continue with topics parsing + } else { + offset += consumed + } + + // Topics (compact array) - Now correctly positioned after tagged fields + topicsCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topics compact array: %w", apiVersion, err) + } + offset += consumed + + type topicSpec struct { + name string + partitions uint32 + replication uint16 + } + topics := make([]topicSpec, 0, topicsCount) + + for i := uint32(0); i < topicsCount; i++ { + // Topic name (compact string) + name, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] name: %w", apiVersion, i, err) + } + offset += consumed + + if len(requestBody) < offset+6 { + return nil, fmt.Errorf("CreateTopics v%d: truncated partitions/replication for topic[%d]", apiVersion, i) + } + + partitions := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + replication := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + // ADMIN CLIENT COMPATIBILITY: AdminClient uses little-endian for replication factor + // This violates Kafka protocol spec but we need to handle it for compatibility + if replication == 256 { + replication = 1 // AdminClient sent 0x01 0x00, intended as little-endian 1 + } + + // Apply defaults for invalid values + if partitions <= 0 { + partitions = uint32(h.GetDefaultPartitions()) // Use configurable default + } + if replication <= 0 { + replication = 1 // Default to 1 replica + } + + // FIX 2: Assignments (compact array) - this was missing! + assignCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] assignments array: %w", apiVersion, i, err) + } + offset += consumed + + // Skip assignment entries (partition_id + replicas array) + for j := uint32(0); j < assignCount; j++ { + // partition_id (int32) + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v%d: truncated assignment[%d] partition_id", apiVersion, j) + } + offset += 4 + + // replicas (compact array of int32) + replicasCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode assignment[%d] replicas: %w", apiVersion, j, err) + } + offset += consumed + + // Skip replica broker IDs (int32 each) + if len(requestBody) < offset+int(replicasCount)*4 { + return nil, fmt.Errorf("CreateTopics v%d: truncated assignment[%d] replicas", apiVersion, j) + } + offset += int(replicasCount) * 4 + + // Assignment tagged fields + _, consumed, err = DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode assignment[%d] tagged fields: %w", apiVersion, j, err) + } + offset += consumed + } + + // Configs (compact array) - skip entries + cfgCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] configs array: %w", apiVersion, i, err) + } + offset += consumed + + for j := uint32(0); j < cfgCount; j++ { + // name (compact string) + _, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] name: %w", apiVersion, i, j, err) + } + offset += consumed + + // value (nullable compact string) + _, consumed, err = DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] value: %w", apiVersion, i, j, err) + } + offset += consumed + + // tagged fields for each config + _, consumed, err = DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] config[%d] tagged fields: %w", apiVersion, i, j, err) + } + offset += consumed + } + + // Tagged fields for topic + _, consumed, err = DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("CreateTopics v%d: decode topic[%d] tagged fields: %w", apiVersion, i, err) + } + offset += consumed + + topics = append(topics, topicSpec{name: name, partitions: partitions, replication: replication}) + } + + for range topics { + } + + // timeout_ms (int32) + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("CreateTopics v%d: missing timeout_ms", apiVersion) + } + timeoutMs := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + // validate_only (boolean) + if len(requestBody) < offset+1 { + return nil, fmt.Errorf("CreateTopics v%d: missing validate_only flag", apiVersion) + } + validateOnly := requestBody[offset] != 0 + offset += 1 + + // Remaining bytes after parsing - could be additional fields + if offset < len(requestBody) { + } + + // Reconstruct a non-flexible v2-like request body and reuse existing handler + // Format: topics(ARRAY) + timeout_ms(INT32) + validate_only(BOOLEAN) + var legacyBody []byte + + // topics count (int32) + legacyBody = append(legacyBody, 0, 0, 0, byte(len(topics))) + if len(topics) > 0 { + legacyBody[len(legacyBody)-1] = byte(len(topics)) + } + + for _, t := range topics { + // topic name (STRING) + nameLen := uint16(len(t.name)) + legacyBody = append(legacyBody, byte(nameLen>>8), byte(nameLen)) + legacyBody = append(legacyBody, []byte(t.name)...) + + // num_partitions (INT32) + legacyBody = append(legacyBody, byte(t.partitions>>24), byte(t.partitions>>16), byte(t.partitions>>8), byte(t.partitions)) + + // replication_factor (INT16) + legacyBody = append(legacyBody, byte(t.replication>>8), byte(t.replication)) + + // assignments array (INT32 count = 0) + legacyBody = append(legacyBody, 0, 0, 0, 0) + + // configs array (INT32 count = 0) + legacyBody = append(legacyBody, 0, 0, 0, 0) + } + + // timeout_ms + legacyBody = append(legacyBody, byte(timeoutMs>>24), byte(timeoutMs>>16), byte(timeoutMs>>8), byte(timeoutMs)) + + // validate_only + if validateOnly { + legacyBody = append(legacyBody, 1) + } else { + legacyBody = append(legacyBody, 0) + } + + // Build response directly instead of delegating to avoid circular dependency + response := make([]byte, 0, 128) + + // NOTE: Correlation ID and header tagged fields are handled by writeResponseWithHeader + // Do NOT include them in the response body + + // throttle_time_ms (4 bytes) - first field in CreateTopics response body + response = append(response, 0, 0, 0, 0) + + // topics (compact array) - V5 FLEXIBLE FORMAT + topicCount := len(topics) + + // Debug: log response size at each step + debugResponseSize := func(step string) { + } + debugResponseSize("After correlation ID and throttle_time_ms") + + // Compact array: length is encoded as UNSIGNED_VARINT(actualLength + 1) + response = append(response, EncodeUvarint(uint32(topicCount+1))...) + debugResponseSize("After topics array length") + + // For each topic + for _, t := range topics { + // name (compact string): length is encoded as UNSIGNED_VARINT(actualLength + 1) + nameBytes := []byte(t.name) + response = append(response, EncodeUvarint(uint32(len(nameBytes)+1))...) + response = append(response, nameBytes...) + + // TopicId - Not present in v5, only added in v7+ + // v5 CreateTopics response does not include TopicId field + + // error_code (int16) + var errCode uint16 = 0 + + // ADMIN CLIENT COMPATIBILITY: Apply defaults before error checking + actualPartitions := t.partitions + if actualPartitions == 0 { + actualPartitions = 1 // Default to 1 partition if 0 requested + } + actualReplication := t.replication + if actualReplication == 0 { + actualReplication = 1 // Default to 1 replication if 0 requested + } + + // ADMIN CLIENT COMPATIBILITY: Always return success for existing topics + // AdminClient expects topic creation to succeed, even if topic already exists + if h.seaweedMQHandler.TopicExists(t.name) { + errCode = 0 // SUCCESS - AdminClient can handle this gracefully + } else { + // Use corrected values for error checking and topic creation with schema support + if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil { + errCode = 1 // UNKNOWN_SERVER_ERROR + } + } + eb := make([]byte, 2) + binary.BigEndian.PutUint16(eb, errCode) + response = append(response, eb...) + + // error_message (compact nullable string) - ADMINCLIENT 7.4.0-CE COMPATIBILITY FIX + // For "_schemas" topic, send null for byte-level compatibility with Java reference + // For other topics, send empty string to avoid NPE in AdminClient response handling + if t.name == "_schemas" { + response = append(response, 0) // Null = 0 + } else { + response = append(response, 1) // Empty string = 1 (0 chars + 1) + } + + // ADDED FOR V5: num_partitions (int32) + // ADMIN CLIENT COMPATIBILITY: Use corrected values from error checking logic + partBytes := make([]byte, 4) + binary.BigEndian.PutUint32(partBytes, actualPartitions) + response = append(response, partBytes...) + + // ADDED FOR V5: replication_factor (int16) + replBytes := make([]byte, 2) + binary.BigEndian.PutUint16(replBytes, actualReplication) + response = append(response, replBytes...) + + // configs (compact nullable array) - ADDED FOR V5 + // ADMINCLIENT 7.4.0-CE NPE FIX: Send empty configs array instead of null + // AdminClient 7.4.0-ce has NPE when configs=null but were requested + // Empty array = 1 (0 configs + 1), still achieves ~30-byte response + response = append(response, 1) // Empty configs array = 1 (0 configs + 1) + + // Tagged fields for each topic - V5 format per Kafka source + // Count tagged fields (topicConfigErrorCode only if != 0) + topicConfigErrorCode := uint16(0) // No error + numTaggedFields := 0 + if topicConfigErrorCode != 0 { + numTaggedFields = 1 + } + + // Write tagged fields count + response = append(response, EncodeUvarint(uint32(numTaggedFields))...) + + // Write tagged fields (only if topicConfigErrorCode != 0) + if topicConfigErrorCode != 0 { + // Tag 0: TopicConfigErrorCode + response = append(response, EncodeUvarint(0)...) // Tag number 0 + response = append(response, EncodeUvarint(2)...) // Length (int16 = 2 bytes) + topicConfigErrBytes := make([]byte, 2) + binary.BigEndian.PutUint16(topicConfigErrBytes, topicConfigErrorCode) + response = append(response, topicConfigErrBytes...) + } + + debugResponseSize(fmt.Sprintf("After topic '%s'", t.name)) + } + + // Top-level tagged fields for v5 flexible response (empty) + response = append(response, 0) // Empty tagged fields = 0 + debugResponseSize("Final response") + + return response, nil +} + +func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ([]byte, error) { + // Parse minimal DeleteTopics request + // Request format: client_id + timeout(4) + topics_array + + if len(requestBody) < 6 { // client_id_size(2) + timeout(4) + return nil, fmt.Errorf("DeleteTopics request too short") + } + + // Skip client_id + clientIDSize := binary.BigEndian.Uint16(requestBody[0:2]) + offset := 2 + int(clientIDSize) + + if len(requestBody) < offset+8 { // timeout(4) + topics_count(4) + return nil, fmt.Errorf("DeleteTopics request missing data") + } + + // Skip timeout + offset += 4 + + topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + + response := make([]byte, 0, 256) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + + // Throttle time (4 bytes, 0 = no throttling) + response = append(response, 0, 0, 0, 0) + + // Topics count (same as request) + topicsCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(topicsCountBytes, topicsCount) + response = append(response, topicsCountBytes...) + + // Process each topic (using SeaweedMQ handler) + + for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ { + if len(requestBody) < offset+2 { + break + } + + // Parse topic name + topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2]) + offset += 2 + + if len(requestBody) < offset+int(topicNameSize) { + break + } + + topicName := string(requestBody[offset : offset+int(topicNameSize)]) + offset += int(topicNameSize) + + // Response: topic_name + error_code(2) + error_message + response = append(response, byte(topicNameSize>>8), byte(topicNameSize)) + response = append(response, []byte(topicName)...) + + // Check if topic exists and delete it + var errorCode uint16 = 0 + var errorMessage string = "" + + // Use SeaweedMQ integration + if !h.seaweedMQHandler.TopicExists(topicName) { + errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION + errorMessage = "Unknown topic" + } else { + // Delete the topic from SeaweedMQ + if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil { + errorCode = 1 // UNKNOWN_SERVER_ERROR + errorMessage = err.Error() + } + } + + // Error code + response = append(response, byte(errorCode>>8), byte(errorCode)) + + // Error message (nullable string) + if errorMessage == "" { + response = append(response, 0xFF, 0xFF) // null string + } else { + errorMsgLen := uint16(len(errorMessage)) + response = append(response, byte(errorMsgLen>>8), byte(errorMsgLen)) + response = append(response, []byte(errorMessage)...) + } + } + + return response, nil +} + +// validateAPIVersion checks if we support the requested API version +func (h *Handler) validateAPIVersion(apiKey, apiVersion uint16) error { + supportedVersions := map[APIKey][2]uint16{ + APIKeyApiVersions: {0, 4}, // ApiVersions: v0-v4 (Kafka 8.0.0 compatibility) + APIKeyMetadata: {0, 7}, // Metadata: v0-v7 + APIKeyProduce: {0, 7}, // Produce: v0-v7 + APIKeyFetch: {0, 7}, // Fetch: v0-v7 + APIKeyListOffsets: {0, 2}, // ListOffsets: v0-v2 + APIKeyCreateTopics: {0, 5}, // CreateTopics: v0-v5 (updated to match implementation) + APIKeyDeleteTopics: {0, 4}, // DeleteTopics: v0-v4 + APIKeyFindCoordinator: {0, 3}, // FindCoordinator: v0-v3 (v3+ uses flexible format) + APIKeyJoinGroup: {0, 6}, // JoinGroup: cap to v6 (first flexible version) + APIKeySyncGroup: {0, 5}, // SyncGroup: v0-v5 + APIKeyOffsetCommit: {0, 2}, // OffsetCommit: v0-v2 + APIKeyOffsetFetch: {0, 5}, // OffsetFetch: v0-v5 (updated to match implementation) + APIKeyHeartbeat: {0, 4}, // Heartbeat: v0-v4 + APIKeyLeaveGroup: {0, 4}, // LeaveGroup: v0-v4 + APIKeyDescribeGroups: {0, 5}, // DescribeGroups: v0-v5 + APIKeyListGroups: {0, 4}, // ListGroups: v0-v4 + APIKeyDescribeConfigs: {0, 4}, // DescribeConfigs: v0-v4 + APIKeyInitProducerId: {0, 4}, // InitProducerId: v0-v4 + APIKeyDescribeCluster: {0, 1}, // DescribeCluster: v0-v1 (KIP-919, AdminClient compatibility) + } + + if versionRange, exists := supportedVersions[APIKey(apiKey)]; exists { + minVer, maxVer := versionRange[0], versionRange[1] + if apiVersion < minVer || apiVersion > maxVer { + return fmt.Errorf("unsupported API version %d for API key %d (supported: %d-%d)", + apiVersion, apiKey, minVer, maxVer) + } + return nil + } + + return fmt.Errorf("unsupported API key: %d", apiKey) +} + +// buildUnsupportedVersionResponse creates a proper Kafka error response +func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, apiVersion uint16) ([]byte, error) { + errorMsg := fmt.Sprintf("Unsupported version %d for API key", apiVersion) + return BuildErrorResponseWithMessage(correlationID, ErrorCodeUnsupportedVersion, errorMsg), nil +} + +// handleMetadata routes to the appropriate version-specific handler +func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + switch apiVersion { + case 0: + return h.HandleMetadataV0(correlationID, requestBody) + case 1: + return h.HandleMetadataV1(correlationID, requestBody) + case 2: + return h.HandleMetadataV2(correlationID, requestBody) + case 3, 4: + return h.HandleMetadataV3V4(correlationID, requestBody) + case 5, 6: + return h.HandleMetadataV5V6(correlationID, requestBody) + case 7: + return h.HandleMetadataV7(correlationID, requestBody) + default: + // For versions > 7, use the V7 handler (flexible format) + if apiVersion > 7 { + return h.HandleMetadataV7(correlationID, requestBody) + } + return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) + } +} + +// getAPIName returns a human-readable name for Kafka API keys (for debugging) +func getAPIName(apiKey APIKey) string { + switch apiKey { + case APIKeyProduce: + return "Produce" + case APIKeyFetch: + return "Fetch" + case APIKeyListOffsets: + return "ListOffsets" + case APIKeyMetadata: + return "Metadata" + case APIKeyOffsetCommit: + return "OffsetCommit" + case APIKeyOffsetFetch: + return "OffsetFetch" + case APIKeyFindCoordinator: + return "FindCoordinator" + case APIKeyJoinGroup: + return "JoinGroup" + case APIKeyHeartbeat: + return "Heartbeat" + case APIKeyLeaveGroup: + return "LeaveGroup" + case APIKeySyncGroup: + return "SyncGroup" + case APIKeyDescribeGroups: + return "DescribeGroups" + case APIKeyListGroups: + return "ListGroups" + case APIKeyApiVersions: + return "ApiVersions" + case APIKeyCreateTopics: + return "CreateTopics" + case APIKeyDeleteTopics: + return "DeleteTopics" + case APIKeyDescribeConfigs: + return "DescribeConfigs" + case APIKeyInitProducerId: + return "InitProducerId" + case APIKeyDescribeCluster: + return "DescribeCluster" + default: + return "Unknown" + } +} + +// handleDescribeConfigs handles DescribeConfigs API requests (API key 32) +func (h *Handler) handleDescribeConfigs(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + + // Parse request to extract resources + resources, err := h.parseDescribeConfigsRequest(requestBody, apiVersion) + if err != nil { + Error("DescribeConfigs parsing error: %v", err) + return nil, fmt.Errorf("failed to parse DescribeConfigs request: %w", err) + } + + isFlexible := apiVersion >= 4 + if !isFlexible { + // Legacy (non-flexible) response for v0-3 + response := make([]byte, 0, 2048) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + + // Throttle time (0ms) + throttleBytes := make([]byte, 4) + binary.BigEndian.PutUint32(throttleBytes, 0) + response = append(response, throttleBytes...) + + // Resources array length + resourcesBytes := make([]byte, 4) + binary.BigEndian.PutUint32(resourcesBytes, uint32(len(resources))) + response = append(response, resourcesBytes...) + + // For each resource, return appropriate configs + for _, resource := range resources { + resourceResponse := h.buildDescribeConfigsResourceResponse(resource, apiVersion) + response = append(response, resourceResponse...) + } + + return response, nil + } + + // Flexible response for v4+ + response := make([]byte, 0, 2048) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + + // throttle_time_ms (4 bytes) + response = append(response, 0, 0, 0, 0) + + // Results (compact array) + response = append(response, EncodeUvarint(uint32(len(resources)+1))...) + + for _, res := range resources { + // ErrorCode (int16) = 0 + response = append(response, 0, 0) + // ErrorMessage (compact nullable string) = null (0) + response = append(response, 0) + // ResourceType (int8) + response = append(response, byte(res.ResourceType)) + // ResourceName (compact string) + nameBytes := []byte(res.ResourceName) + response = append(response, EncodeUvarint(uint32(len(nameBytes)+1))...) + response = append(response, nameBytes...) + + // Build configs for this resource + var cfgs []ConfigEntry + if res.ResourceType == 2 { // Topic + cfgs = h.getTopicConfigs(res.ResourceName, res.ConfigNames) + // Ensure cleanup.policy is compact for _schemas + if res.ResourceName == "_schemas" { + replaced := false + for i := range cfgs { + if cfgs[i].Name == "cleanup.policy" { + cfgs[i].Value = "compact" + replaced = true + break + } + } + if !replaced { + cfgs = append(cfgs, ConfigEntry{Name: "cleanup.policy", Value: "compact"}) + } + } + } else if res.ResourceType == 4 { // Broker + cfgs = h.getBrokerConfigs(res.ConfigNames) + } else { + cfgs = []ConfigEntry{} + } + + // Configs (compact array) + response = append(response, EncodeUvarint(uint32(len(cfgs)+1))...) + + for _, cfg := range cfgs { + // name (compact string) + cb := []byte(cfg.Name) + response = append(response, EncodeUvarint(uint32(len(cb)+1))...) + response = append(response, cb...) + + // value (compact nullable string) + vb := []byte(cfg.Value) + if len(vb) == 0 { + response = append(response, 0) // null + } else { + response = append(response, EncodeUvarint(uint32(len(vb)+1))...) + response = append(response, vb...) + } + + // readOnly (bool) + if cfg.ReadOnly { + response = append(response, 1) + } else { + response = append(response, 0) + } + + // configSource (int8): DEFAULT_CONFIG = 5 + response = append(response, byte(5)) + + // isSensitive (bool) + if cfg.Sensitive { + response = append(response, 1) + } else { + response = append(response, 0) + } + + // synonyms (compact array) - empty + response = append(response, 1) + + // config_type (int8) - STRING = 1 + response = append(response, byte(1)) + + // documentation (compact nullable string) - null + response = append(response, 0) + + // per-config tagged fields (empty) + response = append(response, 0) + } + + // Per-result tagged fields (empty) + response = append(response, 0) + } + + // Top-level tagged fields (empty) + response = append(response, 0) + + return response, nil +} + +// isFlexibleResponse determines if an API response should use flexible format (with header tagged fields) +// Based on Kafka protocol specifications: most APIs become flexible at v3+, but some differ +func isFlexibleResponse(apiKey uint16, apiVersion uint16) bool { + // Reference: kafka-go/protocol/response.go:119 and sarama/response_header.go:21 + // Flexible responses have headerVersion >= 1, which adds tagged fields after correlation ID + + switch APIKey(apiKey) { + case APIKeyProduce: + return apiVersion >= 9 + case APIKeyFetch: + return apiVersion >= 12 + case APIKeyMetadata: + // Metadata v9+ uses flexible responses (v7-8 use compact arrays/strings but NOT flexible headers) + return apiVersion >= 9 + case APIKeyOffsetCommit: + return apiVersion >= 8 + case APIKeyOffsetFetch: + return apiVersion >= 6 + case APIKeyFindCoordinator: + return apiVersion >= 3 + case APIKeyJoinGroup: + return apiVersion >= 6 + case APIKeyHeartbeat: + return apiVersion >= 4 + case APIKeyLeaveGroup: + return apiVersion >= 4 + case APIKeySyncGroup: + return apiVersion >= 4 + case APIKeyApiVersions: + // CRITICAL: AdminClient compatibility requires header version 0 (no tagged fields) + // Even though ApiVersions v3+ technically supports flexible responses, AdminClient + // expects the header to NOT include tagged fields. This is a known quirk. + return false // Always use non-flexible header for ApiVersions + case APIKeyCreateTopics: + return apiVersion >= 5 + case APIKeyDeleteTopics: + return apiVersion >= 4 + case APIKeyInitProducerId: + return apiVersion >= 2 // Flexible from v2+ (KIP-360) + case APIKeyDescribeConfigs: + return apiVersion >= 4 + case APIKeyDescribeCluster: + return true // All versions (0+) are flexible + default: + // For unknown APIs, assume non-flexible (safer default) + return false + } +} + +// writeResponseWithHeader writes a Kafka response following the wire protocol: +// [Size: 4 bytes][Correlation ID: 4 bytes][Tagged Fields (if flexible)][Body] +func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32, apiKey uint16, apiVersion uint16, responseBody []byte, timeout time.Duration) error { + // Kafka wire protocol format (from kafka-go/protocol/response.go:116-138 and sarama/response_header.go:10-27): + // [4 bytes: size = len(everything after this)] + // [4 bytes: correlation ID] + // [varint: header tagged fields (0x00 for empty) - ONLY for flexible responses with headerVersion >= 1] + // [N bytes: response body] + + // Determine if this response should be flexible + isFlexible := isFlexibleResponse(apiKey, apiVersion) + + // Calculate total size: correlation ID (4) + tagged fields (1 if flexible) + body + totalSize := 4 + len(responseBody) + if isFlexible { + totalSize += 1 // Add 1 byte for empty tagged fields (0x00) + } + + // Build complete response in memory for hex dump logging + fullResponse := make([]byte, 0, 4+totalSize) + + // Write size + sizeBuf := make([]byte, 4) + binary.BigEndian.PutUint32(sizeBuf, uint32(totalSize)) + fullResponse = append(fullResponse, sizeBuf...) + + // Write correlation ID + correlationBuf := make([]byte, 4) + binary.BigEndian.PutUint32(correlationBuf, correlationID) + fullResponse = append(fullResponse, correlationBuf...) + + // Write header-level tagged fields for flexible responses + if isFlexible { + // Empty tagged fields = 0x00 (varint 0) + fullResponse = append(fullResponse, 0x00) + } + + // Write response body + fullResponse = append(fullResponse, responseBody...) + + // Write to connection + if _, err := w.Write(fullResponse); err != nil { + return fmt.Errorf("write response: %w", err) + } + + // Flush + if err := w.Flush(); err != nil { + return fmt.Errorf("flush response: %w", err) + } + + return nil +} + +// hexDump formats bytes as a hex dump with ASCII representation +func hexDump(data []byte) string { + var result strings.Builder + for i := 0; i < len(data); i += 16 { + // Offset + result.WriteString(fmt.Sprintf("%04x ", i)) + + // Hex bytes + for j := 0; j < 16; j++ { + if i+j < len(data) { + result.WriteString(fmt.Sprintf("%02x ", data[i+j])) + } else { + result.WriteString(" ") + } + if j == 7 { + result.WriteString(" ") + } + } + + // ASCII representation + result.WriteString(" |") + for j := 0; j < 16 && i+j < len(data); j++ { + b := data[i+j] + if b >= 32 && b < 127 { + result.WriteByte(b) + } else { + result.WriteByte('.') + } + } + result.WriteString("|\n") + } + return result.String() +} + +// writeResponseWithCorrelationID is deprecated - use writeResponseWithHeader instead +// Kept for compatibility with direct callers that don't have API info +func (h *Handler) writeResponseWithCorrelationID(w *bufio.Writer, correlationID uint32, responseBody []byte, timeout time.Duration) error { + // Assume non-flexible for backward compatibility + return h.writeResponseWithHeader(w, correlationID, 0, 0, responseBody, timeout) +} + +// writeResponseWithTimeout writes a Kafka response with timeout handling +// DEPRECATED: Use writeResponseWithCorrelationID instead +func (h *Handler) writeResponseWithTimeout(w *bufio.Writer, response []byte, timeout time.Duration) error { + // This old function expects response to include correlation ID at the start + // For backward compatibility with any remaining callers + + // Write response size (4 bytes) + responseSizeBytes := make([]byte, 4) + binary.BigEndian.PutUint32(responseSizeBytes, uint32(len(response))) + + if _, err := w.Write(responseSizeBytes); err != nil { + return fmt.Errorf("write response size: %w", err) + } + + // Write response data + if _, err := w.Write(response); err != nil { + return fmt.Errorf("write response data: %w", err) + } + + // Flush the buffer + if err := w.Flush(); err != nil { + return fmt.Errorf("flush response: %w", err) + } + + return nil +} + +// EnableSchemaManagement enables schema management with the given configuration +func (h *Handler) EnableSchemaManagement(config schema.ManagerConfig) error { + manager, err := schema.NewManagerWithHealthCheck(config) + if err != nil { + return fmt.Errorf("failed to create schema manager: %w", err) + } + + h.schemaManager = manager + h.useSchema = true + + return nil +} + +// EnableBrokerIntegration enables mq.broker integration for schematized messages +func (h *Handler) EnableBrokerIntegration(brokers []string) error { + if !h.IsSchemaEnabled() { + return fmt.Errorf("schema management must be enabled before broker integration") + } + + brokerClient := schema.NewBrokerClient(schema.BrokerClientConfig{ + Brokers: brokers, + SchemaManager: h.schemaManager, + }) + + h.brokerClient = brokerClient + return nil +} + +// DisableSchemaManagement disables schema management and broker integration +func (h *Handler) DisableSchemaManagement() { + if h.brokerClient != nil { + h.brokerClient.Close() + h.brokerClient = nil + } + h.schemaManager = nil + h.useSchema = false +} + +// SetSchemaRegistryURL sets the Schema Registry URL for delayed initialization +func (h *Handler) SetSchemaRegistryURL(url string) { + h.schemaRegistryURL = url +} + +// SetDefaultPartitions sets the default partition count for auto-created topics +func (h *Handler) SetDefaultPartitions(partitions int32) { + h.defaultPartitions = partitions +} + +// GetDefaultPartitions returns the default partition count for auto-created topics +func (h *Handler) GetDefaultPartitions() int32 { + if h.defaultPartitions <= 0 { + return 4 // Fallback default + } + return h.defaultPartitions +} + +// IsSchemaEnabled returns whether schema management is enabled +func (h *Handler) IsSchemaEnabled() bool { + // Try to initialize schema management if not already done + if !h.useSchema && h.schemaRegistryURL != "" { + h.tryInitializeSchemaManagement() + } + return h.useSchema && h.schemaManager != nil +} + +// tryInitializeSchemaManagement attempts to initialize schema management +// This is called lazily when schema functionality is first needed +func (h *Handler) tryInitializeSchemaManagement() { + if h.useSchema || h.schemaRegistryURL == "" { + return // Already initialized or no URL provided + } + + schemaConfig := schema.ManagerConfig{ + RegistryURL: h.schemaRegistryURL, + } + + if err := h.EnableSchemaManagement(schemaConfig); err != nil { + return + } + +} + +// IsBrokerIntegrationEnabled returns true if broker integration is enabled +func (h *Handler) IsBrokerIntegrationEnabled() bool { + return h.IsSchemaEnabled() && h.brokerClient != nil +} + +// commitOffsetToSMQ commits offset using SMQ storage +func (h *Handler) commitOffsetToSMQ(key ConsumerOffsetKey, offsetValue int64, metadata string) error { + // Use new consumer offset storage if available, fall back to SMQ storage + if h.consumerOffsetStorage != nil { + return h.consumerOffsetStorage.CommitOffset(key.ConsumerGroup, key.Topic, key.Partition, offsetValue, metadata) + } + + // No SMQ offset storage - only use consumer offset storage + return fmt.Errorf("offset storage not initialized") +} + +// fetchOffsetFromSMQ fetches offset using SMQ storage +func (h *Handler) fetchOffsetFromSMQ(key ConsumerOffsetKey) (int64, string, error) { + // Use new consumer offset storage if available, fall back to SMQ storage + if h.consumerOffsetStorage != nil { + return h.consumerOffsetStorage.FetchOffset(key.ConsumerGroup, key.Topic, key.Partition) + } + + // SMQ offset storage removed - no fallback + return -1, "", fmt.Errorf("offset storage not initialized") +} + +// DescribeConfigsResource represents a resource in a DescribeConfigs request +type DescribeConfigsResource struct { + ResourceType int8 // 2 = Topic, 4 = Broker + ResourceName string + ConfigNames []string // Empty means return all configs +} + +// parseDescribeConfigsRequest parses a DescribeConfigs request body +func (h *Handler) parseDescribeConfigsRequest(requestBody []byte, apiVersion uint16) ([]DescribeConfigsResource, error) { + if len(requestBody) < 1 { + return nil, fmt.Errorf("request too short") + } + + offset := 0 + + // DescribeConfigs v4+ uses flexible protocol (compact arrays with varint) + isFlexible := apiVersion >= 4 + + var resourcesLength uint32 + if isFlexible { + // Debug: log the first 8 bytes of the request body + debugBytes := requestBody[offset:] + if len(debugBytes) > 8 { + debugBytes = debugBytes[:8] + } + + // FIX: Skip top-level tagged fields for DescribeConfigs v4+ flexible protocol + // The request body starts with tagged fields count (usually 0x00 = empty) + _, consumed, err := DecodeTaggedFields(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("DescribeConfigs v%d: decode top-level tagged fields: %w", apiVersion, err) + } + offset += consumed + + // Resources (compact array) - Now correctly positioned after tagged fields + resourcesLength, consumed, err = DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode resources compact array: %w", err) + } + offset += consumed + } else { + // Regular array: length is int32 + if len(requestBody) < 4 { + return nil, fmt.Errorf("request too short for regular array") + } + resourcesLength = binary.BigEndian.Uint32(requestBody[offset : offset+4]) + offset += 4 + } + + // Validate resources length to prevent panic + if resourcesLength > 100 { // Reasonable limit + return nil, fmt.Errorf("invalid resources length: %d", resourcesLength) + } + + resources := make([]DescribeConfigsResource, 0, resourcesLength) + + for i := uint32(0); i < resourcesLength; i++ { + if offset+1 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for resource type") + } + + // Resource type (1 byte) + resourceType := int8(requestBody[offset]) + offset++ + + // Resource name (string - compact for v4+, regular for v0-3) + var resourceName string + if isFlexible { + // Compact string: length is encoded as UNSIGNED_VARINT(actualLength + 1) + name, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode resource name compact string: %w", err) + } + resourceName = name + offset += consumed + } else { + // Regular string: length is int16 + if offset+2 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for resource name length") + } + nameLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + + // Validate name length to prevent panic + if nameLength < 0 || nameLength > 1000 { // Reasonable limit + return nil, fmt.Errorf("invalid resource name length: %d", nameLength) + } + + if offset+nameLength > len(requestBody) { + return nil, fmt.Errorf("insufficient data for resource name") + } + resourceName = string(requestBody[offset : offset+nameLength]) + offset += nameLength + } + + // Config names array (compact for v4+, regular for v0-3) + var configNames []string + if isFlexible { + // Compact array: length is encoded as UNSIGNED_VARINT(actualLength + 1) + // For nullable arrays, 0 means null, 1 means empty + configNamesCount, consumed, err := DecodeCompactArrayLength(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode config names compact array: %w", err) + } + offset += consumed + + // Parse each config name as compact string (if not null) + if configNamesCount > 0 { + for j := uint32(0); j < configNamesCount; j++ { + configName, consumed, err := DecodeFlexibleString(requestBody[offset:]) + if err != nil { + return nil, fmt.Errorf("decode config name[%d] compact string: %w", j, err) + } + offset += consumed + configNames = append(configNames, configName) + } + } + } else { + // Regular array: length is int32 + if offset+4 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for config names length") + } + configNamesLength := int32(binary.BigEndian.Uint32(requestBody[offset : offset+4])) + offset += 4 + + // Validate config names length to prevent panic + // Note: -1 means null/empty array in Kafka protocol + if configNamesLength < -1 || configNamesLength > 1000 { // Reasonable limit + return nil, fmt.Errorf("invalid config names length: %d", configNamesLength) + } + + // Handle null array case + if configNamesLength == -1 { + configNamesLength = 0 + } + + configNames = make([]string, 0, configNamesLength) + for j := int32(0); j < configNamesLength; j++ { + if offset+2 > len(requestBody) { + return nil, fmt.Errorf("insufficient data for config name length") + } + configNameLength := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + + // Validate config name length to prevent panic + if configNameLength < 0 || configNameLength > 500 { // Reasonable limit + return nil, fmt.Errorf("invalid config name length: %d", configNameLength) + } + + if offset+configNameLength > len(requestBody) { + return nil, fmt.Errorf("insufficient data for config name") + } + configName := string(requestBody[offset : offset+configNameLength]) + offset += configNameLength + + configNames = append(configNames, configName) + } + } + + resources = append(resources, DescribeConfigsResource{ + ResourceType: resourceType, + ResourceName: resourceName, + ConfigNames: configNames, + }) + } + + return resources, nil +} + +// buildDescribeConfigsResourceResponse builds the response for a single resource +func (h *Handler) buildDescribeConfigsResourceResponse(resource DescribeConfigsResource, apiVersion uint16) []byte { + response := make([]byte, 0, 512) + + // Error code (0 = no error) + errorCodeBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorCodeBytes, 0) + response = append(response, errorCodeBytes...) + + // Error message (null string = -1 length) + errorMsgBytes := make([]byte, 2) + binary.BigEndian.PutUint16(errorMsgBytes, 0xFFFF) // -1 as uint16 + response = append(response, errorMsgBytes...) + + // Resource type + response = append(response, byte(resource.ResourceType)) + + // Resource name + nameBytes := make([]byte, 2+len(resource.ResourceName)) + binary.BigEndian.PutUint16(nameBytes[0:2], uint16(len(resource.ResourceName))) + copy(nameBytes[2:], []byte(resource.ResourceName)) + response = append(response, nameBytes...) + + // Get configs for this resource + configs := h.getConfigsForResource(resource) + + // Config entries array length + configCountBytes := make([]byte, 4) + binary.BigEndian.PutUint32(configCountBytes, uint32(len(configs))) + response = append(response, configCountBytes...) + + // Add each config entry + for _, config := range configs { + configBytes := h.buildConfigEntry(config, apiVersion) + response = append(response, configBytes...) + } + + return response +} + +// ConfigEntry represents a single configuration entry +type ConfigEntry struct { + Name string + Value string + ReadOnly bool + IsDefault bool + Sensitive bool +} + +// getConfigsForResource returns appropriate configs for a resource +func (h *Handler) getConfigsForResource(resource DescribeConfigsResource) []ConfigEntry { + switch resource.ResourceType { + case 2: // Topic + return h.getTopicConfigs(resource.ResourceName, resource.ConfigNames) + case 4: // Broker + return h.getBrokerConfigs(resource.ConfigNames) + default: + return []ConfigEntry{} + } +} + +// getTopicConfigs returns topic-level configurations +func (h *Handler) getTopicConfigs(topicName string, requestedConfigs []string) []ConfigEntry { + // Default topic configs that admin clients commonly request + allConfigs := map[string]ConfigEntry{ + "cleanup.policy": { + Name: "cleanup.policy", + Value: "delete", + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "retention.ms": { + Name: "retention.ms", + Value: "604800000", // 7 days in milliseconds + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "retention.bytes": { + Name: "retention.bytes", + Value: "-1", // Unlimited + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "segment.ms": { + Name: "segment.ms", + Value: "86400000", // 1 day in milliseconds + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "max.message.bytes": { + Name: "max.message.bytes", + Value: "1048588", // ~1MB + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "min.insync.replicas": { + Name: "min.insync.replicas", + Value: "1", + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + } + + // If specific configs requested, filter to those + if len(requestedConfigs) > 0 { + filteredConfigs := make([]ConfigEntry, 0, len(requestedConfigs)) + for _, configName := range requestedConfigs { + if config, exists := allConfigs[configName]; exists { + filteredConfigs = append(filteredConfigs, config) + } + } + return filteredConfigs + } + + // Return all configs + configs := make([]ConfigEntry, 0, len(allConfigs)) + for _, config := range allConfigs { + configs = append(configs, config) + } + return configs +} + +// getBrokerConfigs returns broker-level configurations +func (h *Handler) getBrokerConfigs(requestedConfigs []string) []ConfigEntry { + // Default broker configs that admin clients commonly request + allConfigs := map[string]ConfigEntry{ + "log.retention.hours": { + Name: "log.retention.hours", + Value: "168", // 7 days + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "log.segment.bytes": { + Name: "log.segment.bytes", + Value: "1073741824", // 1GB + ReadOnly: false, + IsDefault: true, + Sensitive: false, + }, + "num.network.threads": { + Name: "num.network.threads", + Value: "3", + ReadOnly: true, + IsDefault: true, + Sensitive: false, + }, + "num.io.threads": { + Name: "num.io.threads", + Value: "8", + ReadOnly: true, + IsDefault: true, + Sensitive: false, + }, + } + + // If specific configs requested, filter to those + if len(requestedConfigs) > 0 { + filteredConfigs := make([]ConfigEntry, 0, len(requestedConfigs)) + for _, configName := range requestedConfigs { + if config, exists := allConfigs[configName]; exists { + filteredConfigs = append(filteredConfigs, config) + } + } + return filteredConfigs + } + + // Return all configs + configs := make([]ConfigEntry, 0, len(allConfigs)) + for _, config := range allConfigs { + configs = append(configs, config) + } + return configs +} + +// buildConfigEntry builds the wire format for a single config entry +func (h *Handler) buildConfigEntry(config ConfigEntry, apiVersion uint16) []byte { + entry := make([]byte, 0, 256) + + // Config name + nameBytes := make([]byte, 2+len(config.Name)) + binary.BigEndian.PutUint16(nameBytes[0:2], uint16(len(config.Name))) + copy(nameBytes[2:], []byte(config.Name)) + entry = append(entry, nameBytes...) + + // Config value + valueBytes := make([]byte, 2+len(config.Value)) + binary.BigEndian.PutUint16(valueBytes[0:2], uint16(len(config.Value))) + copy(valueBytes[2:], []byte(config.Value)) + entry = append(entry, valueBytes...) + + // Read only flag + if config.ReadOnly { + entry = append(entry, 1) + } else { + entry = append(entry, 0) + } + + // Is default flag (only for version 0) + if apiVersion == 0 { + if config.IsDefault { + entry = append(entry, 1) + } else { + entry = append(entry, 0) + } + } + + // Config source (for versions 1-3) + if apiVersion >= 1 && apiVersion <= 3 { + // ConfigSource: 1 = DYNAMIC_TOPIC_CONFIG, 2 = DYNAMIC_BROKER_CONFIG, 4 = STATIC_BROKER_CONFIG, 5 = DEFAULT_CONFIG + configSource := int8(5) // DEFAULT_CONFIG for all our configs since they're defaults + entry = append(entry, byte(configSource)) + } + + // Sensitive flag + if config.Sensitive { + entry = append(entry, 1) + } else { + entry = append(entry, 0) + } + + // Config synonyms (for versions 1-3) + if apiVersion >= 1 && apiVersion <= 3 { + // Empty synonyms array (4 bytes for array length = 0) + synonymsLength := make([]byte, 4) + binary.BigEndian.PutUint32(synonymsLength, 0) + entry = append(entry, synonymsLength...) + } + + // Config type (for version 3 only) + if apiVersion == 3 { + configType := int8(1) // STRING type for all our configs + entry = append(entry, byte(configType)) + } + + // Config documentation (for version 3 only) + if apiVersion == 3 { + // Null documentation (length = -1) + docLength := make([]byte, 2) + binary.BigEndian.PutUint16(docLength, 0xFFFF) // -1 as uint16 + entry = append(entry, docLength...) + } + + return entry +} + +// registerSchemasViaBrokerAPI registers both key and value schemas via the broker's ConfigureTopic API +// Only the gateway leader performs the registration to avoid concurrent updates. +func (h *Handler) registerSchemasViaBrokerAPI(topicName string, valueRecordType *schema_pb.RecordType, keyRecordType *schema_pb.RecordType) error { + if valueRecordType == nil && keyRecordType == nil { + return nil + } + + // Check coordinator registry for multi-gateway deployments + // In single-gateway mode, coordinator registry may not be initialized - that's OK + if reg := h.GetCoordinatorRegistry(); reg != nil { + // Multi-gateway mode - check if we're the leader + isLeader := reg.IsLeader() + + if !isLeader { + // Not leader - in production multi-gateway setups, skip to avoid conflicts + // In single-gateway setups where leader election fails, log warning but proceed + // This ensures schema registration works even if distributed locking has issues + // Note: Schema registration is idempotent, so duplicate registrations are safe + } else { + } + } else { + // No coordinator registry - definitely single-gateway mode + } + + // Require SeaweedMQ integration to access broker + if h.seaweedMQHandler == nil { + return fmt.Errorf("no SeaweedMQ handler available for broker access") + } + + // Get broker addresses + brokerAddresses := h.seaweedMQHandler.GetBrokerAddresses() + if len(brokerAddresses) == 0 { + return fmt.Errorf("no broker addresses available") + } + + // Use the first available broker + brokerAddress := brokerAddresses[0] + + // Load security configuration + util.LoadSecurityConfiguration() + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.mq") + + // Get current topic configuration to preserve partition count + seaweedTopic := &schema_pb.Topic{ + Namespace: DefaultKafkaNamespace, + Name: topicName, + } + + return pb.WithBrokerGrpcClient(false, brokerAddress, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + // First get current configuration + getResp, err := client.GetTopicConfiguration(context.Background(), &mq_pb.GetTopicConfigurationRequest{ + Topic: seaweedTopic, + }) + if err != nil { + // Convert dual schemas to flat schema format + var flatSchema *schema_pb.RecordType + var keyColumns []string + if keyRecordType != nil || valueRecordType != nil { + flatSchema, keyColumns = mqschema.CombineFlatSchemaFromKeyValue(keyRecordType, valueRecordType) + } + + // If topic doesn't exist, create it with configurable default partition count + // Get schema format from topic config if available + schemaFormat := h.getTopicSchemaFormat(topicName) + _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: seaweedTopic, + PartitionCount: h.GetDefaultPartitions(), // Use configurable default + MessageRecordType: flatSchema, + KeyColumns: keyColumns, + SchemaFormat: schemaFormat, + }) + return err + } + + // Convert dual schemas to flat schema format for update + var flatSchema *schema_pb.RecordType + var keyColumns []string + if keyRecordType != nil || valueRecordType != nil { + flatSchema, keyColumns = mqschema.CombineFlatSchemaFromKeyValue(keyRecordType, valueRecordType) + } + + // Update existing topic with new schema + // Get schema format from topic config if available + schemaFormat := h.getTopicSchemaFormat(topicName) + _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: seaweedTopic, + PartitionCount: getResp.PartitionCount, + MessageRecordType: flatSchema, + KeyColumns: keyColumns, + Retention: getResp.Retention, + SchemaFormat: schemaFormat, + }) + return err + }) +} + +// handleInitProducerId handles InitProducerId API requests (API key 22) +// This API is used to initialize a producer for transactional or idempotent operations +func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + + // InitProducerId Request Format (varies by version): + // v0-v1: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + // v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16) + // v4+: Uses flexible format with tagged fields + + offset := 0 + + // Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions) + var transactionalId *string + if apiVersion >= 4 { + // Flexible version - use compact nullable string + if len(requestBody) < offset+1 { + return nil, fmt.Errorf("InitProducerId request too short for transactional_id") + } + + length := int(requestBody[offset]) + offset++ + + if length == 0 { + // Null string + transactionalId = nil + } else { + // Non-null string (length is encoded as length+1 in compact format) + actualLength := length - 1 + if len(requestBody) < offset+actualLength { + return nil, fmt.Errorf("InitProducerId request transactional_id too short") + } + if actualLength > 0 { + id := string(requestBody[offset : offset+actualLength]) + transactionalId = &id + offset += actualLength + } else { + // Empty string + id := "" + transactionalId = &id + } + } + } else { + // Non-flexible version - use regular nullable string + if len(requestBody) < offset+2 { + return nil, fmt.Errorf("InitProducerId request too short for transactional_id length") + } + + length := int(binary.BigEndian.Uint16(requestBody[offset : offset+2])) + offset += 2 + + if length == 0xFFFF { + // Null string (-1 as uint16) + transactionalId = nil + } else { + if len(requestBody) < offset+length { + return nil, fmt.Errorf("InitProducerId request transactional_id too short") + } + if length > 0 { + id := string(requestBody[offset : offset+length]) + transactionalId = &id + offset += length + } else { + // Empty string + id := "" + transactionalId = &id + } + } + } + _ = transactionalId // Used for logging/tracking, but not in core logic yet + + // Parse transaction_timeout_ms (INT32) + if len(requestBody) < offset+4 { + return nil, fmt.Errorf("InitProducerId request too short for transaction_timeout_ms") + } + _ = binary.BigEndian.Uint32(requestBody[offset : offset+4]) // transactionTimeoutMs + offset += 4 + + // For v2+, there might be additional fields, but we'll ignore them for now + // as we're providing a basic implementation + + // Build response + response := make([]byte, 0, 64) + + // NOTE: Correlation ID is handled by writeResponseWithHeader + // Do NOT include it in the response body + // Note: Header tagged fields are also handled by writeResponseWithHeader for flexible versions + + // InitProducerId Response Format: + // throttle_time_ms(INT32) + error_code(INT16) + producer_id(INT64) + producer_epoch(INT16) + // + tagged_fields (for flexible versions) + + // Throttle time (4 bytes) - v1+ + if apiVersion >= 1 { + response = append(response, 0, 0, 0, 0) // No throttling + } + + // Error code (2 bytes) - SUCCESS + response = append(response, 0, 0) // No error + + // Producer ID (8 bytes) - generate a simple producer ID + // In a real implementation, this would be managed by a transaction coordinator + producerId := int64(1000) // Simple fixed producer ID for now + producerIdBytes := make([]byte, 8) + binary.BigEndian.PutUint64(producerIdBytes, uint64(producerId)) + response = append(response, producerIdBytes...) + + // Producer epoch (2 bytes) - start with epoch 0 + response = append(response, 0, 0) // Epoch 0 + + // For flexible versions (v4+), add response body tagged fields + if apiVersion >= 4 { + response = append(response, 0x00) // Empty response body tagged fields + } + + return response, nil +} + +// createTopicWithSchemaSupport creates a topic with optional schema integration +// This function creates topics with schema support when schema management is enabled +func (h *Handler) createTopicWithSchemaSupport(topicName string, partitions int32) error { + + // For system topics like _schemas, __consumer_offsets, etc., use default schema + if isSystemTopic(topicName) { + return h.createTopicWithDefaultFlexibleSchema(topicName, partitions) + } + + // Check if Schema Registry URL is configured + if h.schemaRegistryURL != "" { + + // Try to initialize schema management if not already done + if h.schemaManager == nil { + h.tryInitializeSchemaManagement() + } + + // If schema manager is still nil after initialization attempt, Schema Registry is unavailable + if h.schemaManager == nil { + return fmt.Errorf("Schema Registry is configured at %s but unavailable - cannot create topic %s without schema validation", h.schemaRegistryURL, topicName) + } + + // Schema Registry is available - try to fetch existing schema + keyRecordType, valueRecordType, err := h.fetchSchemaForTopic(topicName) + if err != nil { + // Check if this is a connection error vs schema not found + if h.isSchemaRegistryConnectionError(err) { + return fmt.Errorf("Schema Registry is unavailable: %w", err) + } + // Schema not found - this is an error when schema management is enforced + return fmt.Errorf("schema is required for topic %s but no schema found in Schema Registry", topicName) + } + + if keyRecordType != nil || valueRecordType != nil { + // Create topic with schema from Schema Registry + return h.seaweedMQHandler.CreateTopicWithSchemas(topicName, partitions, keyRecordType, valueRecordType) + } + + // No schemas found - this is an error when schema management is enforced + return fmt.Errorf("schema is required for topic %s but no schema found in Schema Registry", topicName) + } + + // Schema Registry URL not configured - create topic without schema (backward compatibility) + return h.seaweedMQHandler.CreateTopic(topicName, partitions) +} + +// createTopicWithDefaultFlexibleSchema creates a topic with a flexible default schema +// that can handle both Avro and JSON messages when schema management is enabled +func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partitions int32) error { + // CRITICAL FIX: System topics like _schemas should be PLAIN Kafka topics without schema management + // Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself + // This was causing issues with Schema Registry bootstrap + + glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName) + return h.seaweedMQHandler.CreateTopic(topicName, partitions) +} + +// fetchSchemaForTopic attempts to fetch schema information for a topic from Schema Registry +// Returns key and value RecordTypes if schemas are found +func (h *Handler) fetchSchemaForTopic(topicName string) (*schema_pb.RecordType, *schema_pb.RecordType, error) { + if h.schemaManager == nil { + return nil, nil, fmt.Errorf("schema manager not available") + } + + var keyRecordType *schema_pb.RecordType + var valueRecordType *schema_pb.RecordType + var lastConnectionError error + + // Try to fetch value schema using standard Kafka naming convention: <topic>-value + valueSubject := topicName + "-value" + cachedSchema, err := h.schemaManager.GetLatestSchema(valueSubject) + if err != nil { + // Check if this is a connection error (Schema Registry unavailable) + if h.isSchemaRegistryConnectionError(err) { + lastConnectionError = err + } + // Not found or connection error - continue to check key schema + } else if cachedSchema != nil { + + // Convert schema to RecordType + recordType, err := h.convertSchemaToRecordType(cachedSchema.Schema, cachedSchema.LatestID) + if err == nil { + valueRecordType = recordType + // Store schema configuration for later use + h.storeTopicSchemaConfig(topicName, cachedSchema.LatestID, schema.FormatAvro) + } else { + } + } + + // Try to fetch key schema (optional) + keySubject := topicName + "-key" + cachedKeySchema, keyErr := h.schemaManager.GetLatestSchema(keySubject) + if keyErr != nil { + if h.isSchemaRegistryConnectionError(keyErr) { + lastConnectionError = keyErr + } + // Not found or connection error - key schema is optional + } else if cachedKeySchema != nil { + + // Convert schema to RecordType + recordType, err := h.convertSchemaToRecordType(cachedKeySchema.Schema, cachedKeySchema.LatestID) + if err == nil { + keyRecordType = recordType + // Store key schema configuration for later use + h.storeTopicKeySchemaConfig(topicName, cachedKeySchema.LatestID, schema.FormatAvro) + } else { + } + } + + // If we encountered connection errors, fail fast + if lastConnectionError != nil && keyRecordType == nil && valueRecordType == nil { + return nil, nil, fmt.Errorf("Schema Registry is unavailable: %w", lastConnectionError) + } + + // Return error if no schemas found (but Schema Registry was reachable) + if keyRecordType == nil && valueRecordType == nil { + return nil, nil, fmt.Errorf("no schemas found for topic %s", topicName) + } + + return keyRecordType, valueRecordType, nil +} + +// isSchemaRegistryConnectionError determines if an error is due to Schema Registry being unavailable +// vs a schema not being found (404) +func (h *Handler) isSchemaRegistryConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + // Connection errors (network issues, DNS resolution, etc.) + if strings.Contains(errStr, "failed to fetch") && + (strings.Contains(errStr, "connection refused") || + strings.Contains(errStr, "no such host") || + strings.Contains(errStr, "timeout") || + strings.Contains(errStr, "network is unreachable")) { + return true + } + + // HTTP 5xx errors (server errors) + if strings.Contains(errStr, "schema registry error 5") { + return true + } + + // HTTP 404 errors are "schema not found", not connection errors + if strings.Contains(errStr, "schema registry error 404") { + return false + } + + // Other HTTP errors (401, 403, etc.) should be treated as connection/config issues + if strings.Contains(errStr, "schema registry error") { + return true + } + + return false +} + +// convertSchemaToRecordType converts a schema string to a RecordType +func (h *Handler) convertSchemaToRecordType(schemaStr string, schemaID uint32) (*schema_pb.RecordType, error) { + // Get the cached schema to determine format + cachedSchema, err := h.schemaManager.GetSchemaByID(schemaID) + if err != nil { + return nil, fmt.Errorf("failed to get cached schema: %w", err) + } + + // Create appropriate decoder and infer RecordType based on format + switch cachedSchema.Format { + case schema.FormatAvro: + // Create Avro decoder and infer RecordType + decoder, err := schema.NewAvroDecoder(schemaStr) + if err != nil { + return nil, fmt.Errorf("failed to create Avro decoder: %w", err) + } + return decoder.InferRecordType() + + case schema.FormatJSONSchema: + // Create JSON Schema decoder and infer RecordType + decoder, err := schema.NewJSONSchemaDecoder(schemaStr) + if err != nil { + return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err) + } + return decoder.InferRecordType() + + case schema.FormatProtobuf: + // For Protobuf, we need the binary descriptor, not string + // This is a limitation - Protobuf schemas in Schema Registry are typically stored as binary descriptors + return nil, fmt.Errorf("Protobuf schema conversion from string not supported - requires binary descriptor") + + default: + return nil, fmt.Errorf("unsupported schema format: %v", cachedSchema.Format) + } +} + +// isSystemTopic checks if a topic is a Kafka system topic +func isSystemTopic(topicName string) bool { + systemTopics := []string{ + "_schemas", + "__consumer_offsets", + "__transaction_state", + "_confluent-ksql-default__command_topic", + "_confluent-metrics", + } + + for _, systemTopic := range systemTopics { + if topicName == systemTopic { + return true + } + } + + // Check for topics starting with underscore (common system topic pattern) + return len(topicName) > 0 && topicName[0] == '_' +} + +// getConnectionContextFromRequest extracts the connection context from the request context +func (h *Handler) getConnectionContextFromRequest(ctx context.Context) *ConnectionContext { + if connCtx, ok := ctx.Value(connContextKey).(*ConnectionContext); ok { + return connCtx + } + return nil +} + +// getOrCreatePartitionReader gets an existing partition reader or creates a new one +// This maintains persistent readers per connection that stream forward, eliminating +// repeated offset lookups and reducing broker CPU load +func (h *Handler) getOrCreatePartitionReader(ctx context.Context, connCtx *ConnectionContext, key TopicPartitionKey, startOffset int64) *partitionReader { + // Try to get existing reader + if val, ok := connCtx.partitionReaders.Load(key); ok { + return val.(*partitionReader) + } + + // Create new reader + reader := newPartitionReader(ctx, h, connCtx, key.Topic, key.Partition, startOffset) + + // Store it (handle race condition where another goroutine created one) + if actual, loaded := connCtx.partitionReaders.LoadOrStore(key, reader); loaded { + // Another goroutine created it first, close ours and use theirs + reader.close() + return actual.(*partitionReader) + } + + return reader +} + +// cleanupPartitionReaders closes all partition readers for a connection +// Called when connection is closing +func cleanupPartitionReaders(connCtx *ConnectionContext) { + if connCtx == nil { + return + } + + connCtx.partitionReaders.Range(func(key, value interface{}) bool { + if reader, ok := value.(*partitionReader); ok { + reader.close() + } + return true // Continue iteration + }) + + glog.V(2).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID) +} |
