diff options
Diffstat (limited to 'weed/mq/kafka/protocol/handler.go')
| -rw-r--r-- | weed/mq/kafka/protocol/handler.go | 536 |
1 files changed, 325 insertions, 211 deletions
diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go index fcfe196c2..2768793d2 100644 --- a/weed/mq/kafka/protocol/handler.go +++ b/weed/mq/kafka/protocol/handler.go @@ -6,6 +6,7 @@ import ( "context" "encoding/binary" "fmt" + "hash/fnv" "io" "net" "os" @@ -26,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" ) // GetAdvertisedAddress returns the host:port that should be advertised to clients @@ -33,25 +35,64 @@ import ( func (h *Handler) GetAdvertisedAddress(gatewayAddr string) (string, int) { host, port := "localhost", 9093 - // Try to parse the gateway address if provided to get the port - if gatewayAddr != "" { - if _, gatewayPort, err := net.SplitHostPort(gatewayAddr); err == nil { + // First, check for environment variable override + if advertisedHost := os.Getenv("KAFKA_ADVERTISED_HOST"); advertisedHost != "" { + host = advertisedHost + glog.V(2).Infof("Using KAFKA_ADVERTISED_HOST: %s", advertisedHost) + } else if gatewayAddr != "" { + // Try to parse the gateway address to extract hostname and port + parsedHost, gatewayPort, err := net.SplitHostPort(gatewayAddr) + if err == nil { + // Successfully parsed host:port if gatewayPortInt, err := strconv.Atoi(gatewayPort); err == nil { - port = gatewayPortInt // Only use the port, not the host + port = gatewayPortInt + } + // Use the parsed host if it's not 0.0.0.0 or empty + if parsedHost != "" && parsedHost != "0.0.0.0" { + host = parsedHost + glog.V(2).Infof("Using host from gatewayAddr: %s", host) + } else { + // Fall back to localhost for 0.0.0.0 or ambiguous addresses + host = "localhost" + glog.V(2).Infof("gatewayAddr is 0.0.0.0, using localhost for client advertising") + } + } else { + // Could not parse, use as-is if it looks like a hostname + if gatewayAddr != "" && gatewayAddr != "0.0.0.0" { + host = gatewayAddr + glog.V(2).Infof("Using gatewayAddr directly as host (unparseable): %s", host) } } - } - - // Override with environment variable if set, otherwise always use localhost for external clients - if advertisedHost := os.Getenv("KAFKA_ADVERTISED_HOST"); advertisedHost != "" { - host = advertisedHost } else { + // No gateway address and no environment variable host = "localhost" + glog.V(2).Infof("No gatewayAddr provided, using localhost") } return host, port } +// generateNodeID generates a deterministic node ID from a gateway address. +// This must match the logic in gateway/coordinator_registry.go to ensure consistency +// between Metadata and FindCoordinator responses. +func generateNodeID(gatewayAddress string) int32 { + if gatewayAddress == "" { + return 1 // Default fallback + } + h := fnv.New32a() + _, _ = h.Write([]byte(gatewayAddress)) + // Use only positive values and avoid 0 + return int32(h.Sum32()&0x7fffffff) + 1 +} + +// GetNodeID returns the consistent node ID for this gateway. +// This is used by both Metadata and FindCoordinator handlers to ensure +// clients see the same broker/coordinator node ID across all APIs. +func (h *Handler) GetNodeID() int32 { + gatewayAddr := h.GetGatewayAddress() + return generateNodeID(gatewayAddr) +} + // TopicInfo holds basic information about a topic type TopicInfo struct { Name string @@ -131,9 +172,10 @@ type SeaweedMQHandlerInterface interface { CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error DeleteTopic(topic string) error GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) + InvalidateTopicExistsCache(topic string) // Ledger methods REMOVED - SMQ handles Kafka offsets natively - ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) - ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) + ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error) + ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) // GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations) // ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) @@ -206,11 +248,6 @@ func (h *Handler) getTopicSchemaFormat(topic string) string { return "" // Empty string means schemaless or format unknown } -// stringPtr returns a pointer to the given string -func stringPtr(s string) *string { - return &s -} - // Handler processes Kafka protocol requests from clients using SeaweedMQ type Handler struct { // SeaweedMQ integration @@ -244,6 +281,11 @@ type Handler struct { registeredSchemas map[string]bool // key: "topic:schemaID" or "topic-key:schemaID" registeredSchemasMu sync.RWMutex + // RecordType inference cache to avoid recreating Avro codecs (37% CPU overhead!) + // Key: schema content hash or schema string + inferredRecordTypes map[string]*schema_pb.RecordType + inferredRecordTypesMu sync.RWMutex + filerClient filer_pb.SeaweedFilerClient // SMQ broker addresses discovered from masters for Metadata responses @@ -285,6 +327,7 @@ func NewTestHandlerWithMock(mockHandler SeaweedMQHandlerInterface) *Handler { groupCoordinator: consumer.NewGroupCoordinator(), registeredSchemas: make(map[string]bool), topicSchemaConfigs: make(map[string]*TopicSchemaConfig), + inferredRecordTypes: make(map[string]*schema_pb.RecordType), defaultPartitions: 1, } } @@ -330,6 +373,8 @@ func NewSeaweedMQBrokerHandlerWithDefaults(masters string, filerGroup string, cl groupCoordinator: consumer.NewGroupCoordinator(), smqBrokerAddresses: nil, // Will be set by SetSMQBrokerAddresses() when server starts registeredSchemas: make(map[string]bool), + topicSchemaConfigs: make(map[string]*TopicSchemaConfig), + inferredRecordTypes: make(map[string]*schema_pb.RecordType), defaultPartitions: defaultPartitions, metadataCache: metadataCache, coordinatorCache: coordinatorCache, @@ -365,7 +410,7 @@ func (h *Handler) Close() error { // Close broker client if present if h.brokerClient != nil { if err := h.brokerClient.Close(); err != nil { - Warning("Failed to close broker client: %v", err) + glog.Warningf("Failed to close broker client: %v", err) } } @@ -376,17 +421,6 @@ func (h *Handler) Close() error { return nil } -// StoreRecordBatch stores a record batch for later retrieval during Fetch operations -func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) { - // Record batch storage is now handled by the SeaweedMQ handler -} - -// GetRecordBatch retrieves a stored record batch that contains the requested offset -func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) { - // Record batch retrieval is now handled by the SeaweedMQ handler - return nil, false -} - // SetSMQBrokerAddresses updates the SMQ broker addresses used in Metadata responses func (h *Handler) SetSMQBrokerAddresses(brokerAddresses []string) { h.smqBrokerAddresses = brokerAddresses @@ -406,8 +440,9 @@ func (h *Handler) GetSMQBrokerAddresses() []string { return h.smqBrokerAddresses } - // Final fallback for testing - return []string{"localhost:17777"} + // No brokers configured - return empty slice + // This will cause proper error handling in callers + return []string{} } // GetGatewayAddress returns the current gateway address as a string (for coordinator registry) @@ -415,8 +450,9 @@ func (h *Handler) GetGatewayAddress() string { if h.gatewayAddress != "" { return h.gatewayAddress } - // Fallback for testing - return "localhost:9092" + // No gateway address configured - return empty string + // Callers should handle this as a configuration error + return "" } // SetGatewayAddress sets the gateway address for coordinator registry @@ -491,7 +527,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - // CRITICAL: Create per-connection BrokerClient for isolated gRPC streams + // Create per-connection BrokerClient for isolated gRPC streams // This prevents different connections from interfering with each other's Fetch requests // In mock/unit test mode, this may not be available, so we continue without it var connBrokerClient *integration.BrokerClient @@ -519,7 +555,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Close the per-connection broker client if connBrokerClient != nil { if closeErr := connBrokerClient.Close(); closeErr != nil { - Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr) + glog.Errorf("[%s] Error closing BrokerClient: %v", connectionID, closeErr) } } // Remove connection context from map @@ -539,7 +575,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { consecutiveTimeouts := 0 const maxConsecutiveTimeouts = 3 // Give up after 3 timeouts in a row - // CRITICAL: Separate control plane from data plane + // Separate control plane from data plane // Control plane: Metadata, Heartbeat, JoinGroup, etc. (must be fast, never block) // Data plane: Fetch, Produce (can be slow, may block on I/O) // @@ -554,7 +590,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { var wg sync.WaitGroup // Response writer - maintains request/response order per connection - // CRITICAL: While we process requests concurrently (control/data plane), + // While we process requests concurrently (control/data plane), // we MUST track the order requests arrive and send responses in that same order. // Solution: Track received correlation IDs in a queue, send responses in that queue order. correlationQueue := make([]uint32, 0, 100) @@ -575,7 +611,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // responseChan closed, exit return } - glog.V(2).Infof("[%s] Response writer received correlation=%d from responseChan", connectionID, resp.correlationID) + // Only log at V(3) for debugging, not V(4) in hot path + glog.V(3).Infof("[%s] Response writer received correlation=%d", connectionID, resp.correlationID) correlationQueueMu.Lock() pendingResponses[resp.correlationID] = resp @@ -585,22 +622,18 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { readyResp, exists := pendingResponses[expectedID] if !exists { // Response not ready yet, stop sending - glog.V(3).Infof("[%s] Response writer: waiting for correlation=%d (nextToSend=%d, queueLen=%d)", connectionID, expectedID, nextToSend, len(correlationQueue)) break } // Send this response if readyResp.err != nil { - Error("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) + glog.Errorf("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err) } else { - glog.V(2).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response)) if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil { - glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) - Error("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr) + glog.Errorf("[%s] Response writer WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr) correlationQueueMu.Unlock() return } - glog.V(2).Infof("[%s] Response writer: successfully wrote correlation=%d", connectionID, readyResp.correlationID) } // Remove from pending and advance @@ -627,9 +660,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } - glog.V(2).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) - - // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks + // Removed V(4) logging from hot path - only log errors and important events + + // Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer var response []byte var err error @@ -643,7 +676,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response, err = h.processRequestSync(req) }() - glog.V(2).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) select { case responseChan <- &kafkaResponse{ correlationID: req.correlationID, @@ -652,12 +684,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(2).Infof("[%s] Control plane sent correlation=%d to responseChan", connectionID, req.correlationID) + // Response sent successfully - no logging here case <-ctx.Done(): // Connection closed, stop processing return case <-time.After(5 * time.Second): - glog.Errorf("[%s] DEADLOCK: Control plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID) + glog.Warningf("[%s] Control plane: timeout sending response correlation=%d", connectionID, req.correlationID) } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting @@ -686,7 +718,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } default: // Channel empty, safe to exit - glog.V(2).Infof("[%s] Control plane: drain complete, exiting", connectionID) + glog.V(4).Infof("[%s] Control plane: drain complete, exiting", connectionID) return } } @@ -705,9 +737,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Channel closed, exit return } - glog.V(2).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey) + // Removed V(4) logging from hot path - only log errors and important events - // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks + // Wrap request processing with panic recovery to prevent deadlocks // If processRequestSync panics, we MUST still send a response to avoid blocking the response writer var response []byte var err error @@ -721,7 +753,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response, err = h.processRequestSync(req) }() - glog.V(2).Infof("[%s] Data plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID) // Use select with context to avoid sending on closed channel select { case responseChan <- &kafkaResponse{ @@ -731,12 +762,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(2).Infof("[%s] Data plane sent correlation=%d to responseChan", connectionID, req.correlationID) + // Response sent successfully - no logging here case <-ctx.Done(): // Connection closed, stop processing return case <-time.After(5 * time.Second): - glog.Errorf("[%s] DEADLOCK: Data plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID) + glog.Warningf("[%s] Data plane: timeout sending response correlation=%d", connectionID, req.correlationID) } case <-ctx.Done(): // Context cancelled, drain remaining requests before exiting @@ -748,7 +779,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return } // Process remaining requests with a short timeout - glog.V(3).Infof("[%s] Data plane: processing drained request correlation=%d", connectionID, req.correlationID) response, err := h.processRequestSync(req) select { case responseChan <- &kafkaResponse{ @@ -758,7 +788,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { response: response, err: err, }: - glog.V(3).Infof("[%s] Data plane: sent drained response correlation=%d", connectionID, req.correlationID) + // Response sent - no logging case <-time.After(1 * time.Second): glog.Warningf("[%s] Data plane: timeout sending drained response correlation=%d, discarding", connectionID, req.correlationID) return @@ -774,7 +804,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { }() defer func() { - // CRITICAL: Close channels in correct order to avoid panics + // Close channels in correct order to avoid panics // 1. Close input channels to stop accepting new requests close(controlChan) close(dataChan) @@ -785,48 +815,27 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { }() for { - // Check if context is cancelled + // OPTIMIZATION: Consolidated context/deadline check - avoid redundant select statements + // Check context once at the beginning of the loop select { case <-ctx.Done(): return ctx.Err() default: } - // Set a read deadline for the connection based on context or default timeout + // Set read deadline based on context or default timeout + // OPTIMIZATION: Calculate deadline once per iteration, not multiple times var readDeadline time.Time - var timeoutDuration time.Duration - if deadline, ok := ctx.Deadline(); ok { readDeadline = deadline - timeoutDuration = time.Until(deadline) } else { - // Use configurable read timeout instead of hardcoded 5 seconds - timeoutDuration = timeoutConfig.ReadTimeout - readDeadline = time.Now().Add(timeoutDuration) + readDeadline = time.Now().Add(timeoutConfig.ReadTimeout) } if err := conn.SetReadDeadline(readDeadline); err != nil { return fmt.Errorf("set read deadline: %w", err) } - // Check context before reading - select { - case <-ctx.Done(): - // Give a small delay to ensure proper cleanup - time.Sleep(100 * time.Millisecond) - return ctx.Err() - default: - // If context is close to being cancelled, set a very short timeout - if deadline, ok := ctx.Deadline(); ok { - timeUntilDeadline := time.Until(deadline) - if timeUntilDeadline < 2*time.Second && timeUntilDeadline > 0 { - shortDeadline := time.Now().Add(500 * time.Millisecond) - if err := conn.SetReadDeadline(shortDeadline); err == nil { - } - } - } - } - // Read message size (4 bytes) var sizeBytes [4]byte if _, err := io.ReadFull(r, sizeBytes[:]); err != nil { @@ -834,9 +843,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return nil } if netErr, ok := err.(net.Error); ok && netErr.Timeout() { - // CRITICAL FIX: Track consecutive timeouts to detect CLOSE_WAIT connections - // When remote peer closes, connection enters CLOSE_WAIT and reads keep timing out - // After several consecutive timeouts with no data, assume connection is dead + // Track consecutive timeouts to detect stale connections consecutiveTimeouts++ if consecutiveTimeouts >= maxConsecutiveTimeouts { return nil @@ -852,7 +859,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Successfully read the message size size := binary.BigEndian.Uint32(sizeBytes[:]) - // Debug("Read message size: %d bytes", size) if size == 0 || size > 1024*1024 { // 1MB limit // Use standardized error for message size limit // Send error response for message too large @@ -867,12 +873,15 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } // Read the message - messageBuf := make([]byte, size) + // OPTIMIZATION: Use buffer pool to reduce GC pressure (was 1MB/sec at 1000 req/s) + messageBuf := mem.Allocate(int(size)) + defer mem.Free(messageBuf) if _, err := io.ReadFull(r, messageBuf); err != nil { _ = HandleTimeoutError(err, "read") // errorCode return fmt.Errorf("read message: %w", err) } + // Parse at least the basic header to get API key and correlation ID if len(messageBuf) < 8 { return fmt.Errorf("message too short") @@ -881,9 +890,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { apiKey := binary.BigEndian.Uint16(messageBuf[0:2]) apiVersion := binary.BigEndian.Uint16(messageBuf[2:4]) correlationID := binary.BigEndian.Uint32(messageBuf[4:8]) - - // Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) - + // Validate API version against what we support if err := h.validateAPIVersion(apiKey, apiVersion); err != nil { glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err) @@ -892,8 +899,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { if writeErr != nil { return fmt.Errorf("build error response: %w", writeErr) } - // CRITICAL: Send error response through response queue to maintain sequential ordering - // This prevents deadlocks in the response writer which expects all correlation IDs in sequence + // Send error response through response queue to maintain sequential ordering select { case responseChan <- &kafkaResponse{ correlationID: correlationID, @@ -909,10 +915,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } } - // CRITICAL DEBUG: Log that validation passed - glog.V(4).Infof("API VERSION VALIDATION PASSED: Key=%d (%s), Version=%d, Correlation=%d - proceeding to header parsing", - apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID) - // Extract request body - special handling for ApiVersions requests var requestBody []byte if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 { @@ -951,29 +953,25 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Parse header using flexible version utilities for other APIs header, parsedRequestBody, parseErr := ParseRequestHeader(messageBuf) if parseErr != nil { - // CRITICAL: Log the parsing error for debugging - glog.Errorf("REQUEST HEADER PARSING FAILED: API=%d (%s) v%d, correlation=%d, error=%v, msgLen=%d", - apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr, len(messageBuf)) + glog.Errorf("Request header parsing failed: API=%d (%s) v%d, correlation=%d, error=%v", + apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr) // Fall back to basic header parsing if flexible version parsing fails // Basic header parsing fallback (original logic) bodyOffset := 8 if len(messageBuf) < bodyOffset+2 { - glog.Errorf("FALLBACK PARSING FAILED: missing client_id length, msgLen=%d", len(messageBuf)) return fmt.Errorf("invalid header: missing client_id length") } clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2])) bodyOffset += 2 if clientIDLen >= 0 { if len(messageBuf) < bodyOffset+int(clientIDLen) { - glog.Errorf("FALLBACK PARSING FAILED: client_id truncated, clientIDLen=%d, msgLen=%d", clientIDLen, len(messageBuf)) return fmt.Errorf("invalid header: client_id truncated") } bodyOffset += int(clientIDLen) } requestBody = messageBuf[bodyOffset:] - glog.V(2).Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, len(requestBody)) } else { // Use the successfully parsed request body requestBody = parsedRequestBody @@ -1001,7 +999,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { } } - // CRITICAL: Route request to appropriate processor + // Route request to appropriate processor // Control plane: Fast, never blocks (Metadata, Heartbeat, etc.) // Data plane: Can be slow (Fetch, Produce) @@ -1019,13 +1017,15 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { // Route to appropriate channel based on API key var targetChan chan *kafkaRequest + if apiKey == 2 { // ListOffsets + } if isDataPlaneAPI(apiKey) { targetChan = dataChan } else { targetChan = controlChan } - // CRITICAL: Only add to correlation queue AFTER successful channel send + // Only add to correlation queue AFTER successful channel send // If we add before and the channel blocks, the correlation ID is in the queue // but the request never gets processed, causing response writer deadlock select { @@ -1038,7 +1038,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error { return ctx.Err() case <-time.After(10 * time.Second): // Channel full for too long - this shouldn't happen with proper backpressure - glog.Errorf("[%s] CRITICAL: Failed to queue correlation=%d after 10s timeout - channel full!", connectionID, correlationID) + glog.Errorf("[%s] Failed to queue correlation=%d - channel full (10s timeout)", connectionID, correlationID) return fmt.Errorf("request queue full: correlation=%d", correlationID) } } @@ -1050,6 +1050,13 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { requestStart := time.Now() apiName := getAPIName(APIKey(req.apiKey)) + + // Only log high-volume requests at V(2), not V(4) + if glog.V(2) { + glog.V(2).Infof("[API] %s (key=%d, ver=%d, corr=%d)", + apiName, req.apiKey, req.apiVersion, req.correlationID) + } + var response []byte var err error @@ -1070,7 +1077,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { response, err = h.handleDeleteTopics(req.correlationID, req.requestBody) case APIKeyProduce: - response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody) + response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody) case APIKeyFetch: response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody) @@ -1112,7 +1119,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) { response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody) default: - Warning("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID) + glog.Warningf("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID) err = fmt.Errorf("unsupported API key: %d (version %d)", req.apiKey, req.apiVersion) } @@ -1171,7 +1178,7 @@ func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([] // Error code (2 bytes) - always fixed-length response = append(response, 0, 0) // No error - // API Keys Array - CRITICAL FIX: Use correct encoding based on version + // API Keys Array - use correct encoding based on version if apiVersion >= 3 { // FLEXIBLE FORMAT: Compact array with varint length - THIS FIXES THE ADMINCLIENT BUG! response = append(response, CompactArrayLength(uint32(len(SupportedApiKeys)))...) @@ -1221,11 +1228,16 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] // NOTE: Correlation ID is handled by writeResponseWithCorrelationID // Do NOT include it in the response body + // Get consistent node ID for this gateway + nodeID := h.GetNodeID() + nodeIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(nodeIDBytes, uint32(nodeID)) + // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1) // Broker 0: node_id(4) + host(STRING) + port(4) - response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions) + response = append(response, nodeIDBytes...) // Use consistent node ID // Get advertised address for client connections host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) @@ -1248,7 +1260,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1258,6 +1270,26 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + // This handles the race condition where producers just created topics + // and consumers are requesting metadata before cache TTL expires + glog.V(3).Infof("[METADATA v0] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v0] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v0] Topic %s not found, auto-creating with default partitions", name) + // Auto-create topic (matches Kafka's auto.create.topics.enable=true) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v0] Failed to auto-create topic %s: %v", name, err) + // Don't add to topicsToReturn - client will get error + } else { + glog.V(2).Infof("[METADATA v0] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1300,15 +1332,15 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([] binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID)) response = append(response, partitionIDBytes...) - response = append(response, 0, 0, 0, 1) // leader = 1 (this broker) + response = append(response, nodeIDBytes...) // leader = this broker - // replicas: array length(4) + one broker id (1) - response = append(response, 0, 0, 0, 1) + // replicas: array length(4) + one broker id (this broker) response = append(response, 0, 0, 0, 1) + response = append(response, nodeIDBytes...) - // isr: array length(4) + one broker id (1) - response = append(response, 0, 0, 0, 1) + // isr: array length(4) + one broker id (this broker) response = append(response, 0, 0, 0, 1) + response = append(response, nodeIDBytes...) } } @@ -1323,7 +1355,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1333,6 +1365,22 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + glog.V(3).Infof("[METADATA v1] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v1] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v1] Topic %s not found, auto-creating with default partitions", name) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v1] Failed to auto-create topic %s: %v", name, err) + } else { + glog.V(2).Infof("[METADATA v1] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1343,11 +1391,16 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] // NOTE: Correlation ID is handled by writeResponseWithHeader // Do NOT include it in the response body + // Get consistent node ID for this gateway + nodeID := h.GetNodeID() + nodeIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(nodeIDBytes, uint32(nodeID)) + // Brokers array length (4 bytes) - 1 broker (this gateway) response = append(response, 0, 0, 0, 1) // Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING) - response = append(response, 0, 0, 0, 1) // node_id = 1 + response = append(response, nodeIDBytes...) // Use consistent node ID // Get advertised address for client connections host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) @@ -1372,7 +1425,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] response = append(response, 0, 0) // empty string // ControllerID (4 bytes) - v1 addition - response = append(response, 0, 0, 0, 1) // controller_id = 1 + response = append(response, nodeIDBytes...) // controller_id = this broker // Topics array length (4 bytes) topicsCountBytes := make([]byte, 4) @@ -1414,15 +1467,15 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([] binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID)) response = append(response, partitionIDBytes...) - response = append(response, 0, 0, 0, 1) // leader_id = 1 + response = append(response, nodeIDBytes...) // leader_id = this broker - // replicas: array length(4) + one broker id (1) - response = append(response, 0, 0, 0, 1) + // replicas: array length(4) + one broker id (this broker) response = append(response, 0, 0, 0, 1) + response = append(response, nodeIDBytes...) - // isr: array length(4) + one broker id (1) - response = append(response, 0, 0, 0, 1) + // isr: array length(4) + one broker id (this broker) response = append(response, 0, 0, 0, 1) + response = append(response, nodeIDBytes...) } } @@ -1436,7 +1489,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1446,6 +1499,22 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + glog.V(3).Infof("[METADATA v2] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v2] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v2] Topic %s not found, auto-creating with default partitions", name) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v2] Failed to auto-create topic %s: %v", name, err) + } else { + glog.V(2).Infof("[METADATA v2] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1462,7 +1531,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] // Get advertised address for client connections host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) - nodeID := int32(1) // Single gateway node + nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) binary.Write(&buf, binary.BigEndian, nodeID) @@ -1490,7 +1559,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] buf.WriteString(clusterID) // ControllerID (4 bytes) - v1+ addition - binary.Write(&buf, binary.BigEndian, int32(1)) + binary.Write(&buf, binary.BigEndian, nodeID) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) @@ -1520,15 +1589,15 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([] for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 } } @@ -1544,7 +1613,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) + glog.V(3).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1554,6 +1623,22 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( for _, name := range requestedTopics { if h.seaweedMQHandler.TopicExists(name) { topicsToReturn = append(topicsToReturn, name) + } else { + // Topic doesn't exist according to current cache, check broker directly + glog.V(3).Infof("[METADATA v3/v4] Topic %s not in cache, checking broker directly", name) + h.seaweedMQHandler.InvalidateTopicExistsCache(name) + if h.seaweedMQHandler.TopicExists(name) { + glog.V(3).Infof("[METADATA v3/v4] Topic %s found on broker after cache refresh", name) + topicsToReturn = append(topicsToReturn, name) + } else { + glog.V(3).Infof("[METADATA v3/v4] Topic %s not found, auto-creating with default partitions", name) + if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v3/v4] Failed to auto-create topic %s: %v", name, err) + } else { + glog.V(2).Infof("[METADATA v3/v4] Successfully auto-created topic %s", name) + topicsToReturn = append(topicsToReturn, name) + } + } } } } @@ -1573,7 +1658,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( // Get advertised address for client connections host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) - nodeID := int32(1) // Single gateway node + nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) binary.Write(&buf, binary.BigEndian, nodeID) @@ -1601,7 +1686,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( buf.WriteString(clusterID) // ControllerID (4 bytes) - v1+ addition - binary.Write(&buf, binary.BigEndian, int32(1)) + binary.Write(&buf, binary.BigEndian, nodeID) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) @@ -1631,20 +1716,28 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) ( for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 } } response := buf.Bytes() + // Detailed logging for Metadata response + maxDisplay := len(response) + if maxDisplay > 50 { + maxDisplay = 50 + } + if len(response) > 100 { + } + return response, nil } @@ -1655,7 +1748,7 @@ func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) ( // HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field (REGULAR FORMAT, NOT FLEXIBLE) func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) { - // CRITICAL: Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format + // Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format // Only v9+ uses compact format (flexible responses) return h.handleMetadataV5ToV8(correlationID, requestBody, 7) } @@ -1671,7 +1764,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // Parse requested topics (empty means all) requestedTopics := h.parseMetadataTopics(requestBody) - glog.V(0).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) + glog.V(3).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics) // Determine topics to return using SeaweedMQ handler var topicsToReturn []string @@ -1688,24 +1781,45 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for _, topic := range requestedTopics { if isSystemTopic(topic) { // Always try to auto-create system topics during metadata requests - glog.V(0).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic) if !h.seaweedMQHandler.TopicExists(topic) { - glog.V(0).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic) if err := h.createTopicWithSchemaSupport(topic, 1); err != nil { glog.V(0).Infof("[METADATA v%d] Failed to auto-create system topic %s: %v", apiVersion, topic, err) // Continue without adding to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION } else { - glog.V(0).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic) } } else { - glog.V(0).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) + glog.V(3).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic) } topicsToReturn = append(topicsToReturn, topic) } else if h.seaweedMQHandler.TopicExists(topic) { topicsToReturn = append(topicsToReturn, topic) + } else { + // Topic doesn't exist according to current cache, but let's check broker directly + // This handles the race condition where producers just created topics + // and consumers are requesting metadata before cache TTL expires + glog.V(3).Infof("[METADATA v%d] Topic %s not in cache, checking broker directly", apiVersion, topic) + // Force cache invalidation to do fresh broker check + h.seaweedMQHandler.InvalidateTopicExistsCache(topic) + if h.seaweedMQHandler.TopicExists(topic) { + glog.V(3).Infof("[METADATA v%d] Topic %s found on broker after cache refresh", apiVersion, topic) + topicsToReturn = append(topicsToReturn, topic) + } else { + glog.V(3).Infof("[METADATA v%d] Topic %s not found on broker, auto-creating with default partitions", apiVersion, topic) + // Auto-create non-system topics with default partitions (matches Kafka behavior) + if err := h.createTopicWithSchemaSupport(topic, h.GetDefaultPartitions()); err != nil { + glog.V(2).Infof("[METADATA v%d] Failed to auto-create topic %s: %v", apiVersion, topic, err) + // Don't add to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION + } else { + glog.V(2).Infof("[METADATA v%d] Successfully auto-created topic %s", apiVersion, topic) + topicsToReturn = append(topicsToReturn, topic) + } + } } } - glog.V(0).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) + glog.V(3).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics) } var buf bytes.Buffer @@ -1714,6 +1828,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // NOTE: Correlation ID is handled by writeResponseWithCorrelationID // Do NOT include it in the response body + // ThrottleTimeMs (4 bytes) - v3+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling @@ -1723,7 +1838,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // Get advertised address for client connections host, port := h.GetAdvertisedAddress(h.GetGatewayAddress()) - nodeID := int32(1) // Single gateway node + nodeID := h.GetNodeID() // Get consistent node ID for this gateway // Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING) binary.Write(&buf, binary.BigEndian, nodeID) @@ -1751,7 +1866,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, buf.WriteString(clusterID) // ControllerID (4 bytes) - v1+ addition - binary.Write(&buf, binary.BigEndian, int32(1)) + binary.Write(&buf, binary.BigEndian, nodeID) // Topics array (4 bytes length + topics) binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn))) @@ -1781,7 +1896,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, for partitionID := int32(0); partitionID < partitionCount; partitionID++ { binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex - binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID + binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID // LeaderEpoch (4 bytes) - v7+ addition if apiVersion >= 7 { @@ -1790,11 +1905,11 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, // ReplicaNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // IsrNodes array (4 bytes length + nodes) binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node - binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1 + binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1 // OfflineReplicas array (4 bytes length + nodes) - v5+ addition binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas @@ -1808,6 +1923,14 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte, response := buf.Bytes() + // Detailed logging for Metadata response + maxDisplay := len(response) + if maxDisplay > 50 { + maxDisplay = 50 + } + if len(response) > 100 { + } + return response, nil } @@ -1871,6 +1994,12 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req // Parse minimal request to understand what's being asked (header already stripped) offset := 0 + + maxBytes := len(requestBody) + if maxBytes > 64 { + maxBytes = 64 + } + // v1+ has replica_id(4) if apiVersion >= 1 { if len(requestBody) < offset+4 { @@ -1974,9 +2103,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req responseOffset = earliestOffset } responseTimestamp = 0 // No specific timestamp for earliest - if strings.HasPrefix(string(topicName), "_schemas") { - glog.Infof("SCHEMA REGISTRY LISTOFFSETS EARLIEST: topic=%s partition=%d returning offset=%d", string(topicName), partitionID, responseOffset) - } + case -1: // latest offset // Get the actual latest offset from SMQ if h.seaweedMQHandler == nil { @@ -2015,13 +2142,21 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req actualTopicsCount++ } - // CRITICAL FIX: Update the topics count in the response header with the actual count + // Update the topics count in the response header with the actual count // This prevents ErrIncompleteResponse when request parsing fails mid-way if actualTopicsCount != topicsCount { binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount) + } else { } + if len(response) > 0 { + respPreview := len(response) + if respPreview > 32 { + respPreview = 32 + } + } return response, nil + } func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { @@ -2207,7 +2342,7 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by } else { // Use schema-aware topic creation if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil { - errCode = 1 // UNKNOWN_SERVER_ERROR + errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } eb := make([]byte, 2) @@ -2337,7 +2472,7 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt } else { // Create the topic in SeaweedMQ with schema support if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } @@ -2612,7 +2747,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint } else { // Use corrected values for error checking and topic creation with schema support if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil { - errCode = 1 // UNKNOWN_SERVER_ERROR + errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) } } eb := make([]byte, 2) @@ -2744,7 +2879,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) ( } else { // Delete the topic from SeaweedMQ if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil { - errorCode = 1 // UNKNOWN_SERVER_ERROR + errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16) errorMessage = err.Error() } } @@ -2809,26 +2944,36 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey, // handleMetadata routes to the appropriate version-specific handler func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) { + + var response []byte + var err error + switch apiVersion { case 0: - return h.HandleMetadataV0(correlationID, requestBody) + response, err = h.HandleMetadataV0(correlationID, requestBody) case 1: - return h.HandleMetadataV1(correlationID, requestBody) + response, err = h.HandleMetadataV1(correlationID, requestBody) case 2: - return h.HandleMetadataV2(correlationID, requestBody) + response, err = h.HandleMetadataV2(correlationID, requestBody) case 3, 4: - return h.HandleMetadataV3V4(correlationID, requestBody) + response, err = h.HandleMetadataV3V4(correlationID, requestBody) case 5, 6: - return h.HandleMetadataV5V6(correlationID, requestBody) + response, err = h.HandleMetadataV5V6(correlationID, requestBody) case 7: - return h.HandleMetadataV7(correlationID, requestBody) + response, err = h.HandleMetadataV7(correlationID, requestBody) default: // For versions > 7, use the V7 handler (flexible format) if apiVersion > 7 { - return h.HandleMetadataV7(correlationID, requestBody) + response, err = h.HandleMetadataV7(correlationID, requestBody) + } else { + err = fmt.Errorf("metadata version %d not implemented yet", apiVersion) } - return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion) } + + if err != nil { + } else { + } + return response, err } // getAPIName returns a human-readable name for Kafka API keys (for debugging) @@ -2883,7 +3028,7 @@ func (h *Handler) handleDescribeConfigs(correlationID uint32, apiVersion uint16, // Parse request to extract resources resources, err := h.parseDescribeConfigsRequest(requestBody, apiVersion) if err != nil { - Error("DescribeConfigs parsing error: %v", err) + glog.Errorf("DescribeConfigs parsing error: %v", err) return nil, fmt.Errorf("failed to parse DescribeConfigs request: %w", err) } @@ -3049,7 +3194,7 @@ func isFlexibleResponse(apiKey uint16, apiVersion uint16) bool { case APIKeySyncGroup: return apiVersion >= 4 case APIKeyApiVersions: - // CRITICAL: AdminClient compatibility requires header version 0 (no tagged fields) + // AdminClient compatibility requires header version 0 (no tagged fields) // Even though ApiVersions v3+ technically supports flexible responses, AdminClient // expects the header to NOT include tagged fields. This is a known quirk. return false // Always use non-flexible header for ApiVersions @@ -3122,40 +3267,6 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32, return nil } -// hexDump formats bytes as a hex dump with ASCII representation -func hexDump(data []byte) string { - var result strings.Builder - for i := 0; i < len(data); i += 16 { - // Offset - result.WriteString(fmt.Sprintf("%04x ", i)) - - // Hex bytes - for j := 0; j < 16; j++ { - if i+j < len(data) { - result.WriteString(fmt.Sprintf("%02x ", data[i+j])) - } else { - result.WriteString(" ") - } - if j == 7 { - result.WriteString(" ") - } - } - - // ASCII representation - result.WriteString(" |") - for j := 0; j < 16 && i+j < len(data); j++ { - b := data[i+j] - if b >= 32 && b < 127 { - result.WriteByte(b) - } else { - result.WriteByte('.') - } - } - result.WriteString("|\n") - } - return result.String() -} - // writeResponseWithCorrelationID is deprecated - use writeResponseWithHeader instead // Kept for compatibility with direct callers that don't have API info func (h *Handler) writeResponseWithCorrelationID(w *bufio.Writer, correlationID uint32, responseBody []byte, timeout time.Duration) error { @@ -3319,12 +3430,6 @@ func (h *Handler) parseDescribeConfigsRequest(requestBody []byte, apiVersion uin var resourcesLength uint32 if isFlexible { - // Debug: log the first 8 bytes of the request body - debugBytes := requestBody[offset:] - if len(debugBytes) > 8 { - debugBytes = debugBytes[:8] - } - // FIX: Skip top-level tagged fields for DescribeConfigs v4+ flexible protocol // The request body starts with tagged fields count (usually 0x00 = empty) _, consumed, err := DecodeTaggedFields(requestBody[offset:]) @@ -3825,6 +3930,12 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, // v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16) // v4+: Uses flexible format with tagged fields + + maxBytes := len(requestBody) + if maxBytes > 64 { + maxBytes = 64 + } + offset := 0 // Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions) @@ -3930,6 +4041,10 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16, response = append(response, 0x00) // Empty response body tagged fields } + respPreview := len(response) + if respPreview > 32 { + respPreview = 32 + } return response, nil } @@ -3982,11 +4097,10 @@ func (h *Handler) createTopicWithSchemaSupport(topicName string, partitions int3 // createTopicWithDefaultFlexibleSchema creates a topic with a flexible default schema // that can handle both Avro and JSON messages when schema management is enabled func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partitions int32) error { - // CRITICAL FIX: System topics like _schemas should be PLAIN Kafka topics without schema management + // System topics like _schemas should be PLAIN Kafka topics without schema management // Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself - // This was causing issues with Schema Registry bootstrap - glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName) + glog.V(1).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName) return h.seaweedMQHandler.CreateTopic(topicName, partitions) } @@ -4191,5 +4305,5 @@ func cleanupPartitionReaders(connCtx *ConnectionContext) { return true // Continue iteration }) - glog.V(2).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID) + glog.V(4).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID) } |
