aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/handler.go')
-rw-r--r--weed/mq/kafka/protocol/handler.go536
1 files changed, 325 insertions, 211 deletions
diff --git a/weed/mq/kafka/protocol/handler.go b/weed/mq/kafka/protocol/handler.go
index fcfe196c2..2768793d2 100644
--- a/weed/mq/kafka/protocol/handler.go
+++ b/weed/mq/kafka/protocol/handler.go
@@ -6,6 +6,7 @@ import (
"context"
"encoding/binary"
"fmt"
+ "hash/fnv"
"io"
"net"
"os"
@@ -26,6 +27,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/mem"
)
// GetAdvertisedAddress returns the host:port that should be advertised to clients
@@ -33,25 +35,64 @@ import (
func (h *Handler) GetAdvertisedAddress(gatewayAddr string) (string, int) {
host, port := "localhost", 9093
- // Try to parse the gateway address if provided to get the port
- if gatewayAddr != "" {
- if _, gatewayPort, err := net.SplitHostPort(gatewayAddr); err == nil {
+ // First, check for environment variable override
+ if advertisedHost := os.Getenv("KAFKA_ADVERTISED_HOST"); advertisedHost != "" {
+ host = advertisedHost
+ glog.V(2).Infof("Using KAFKA_ADVERTISED_HOST: %s", advertisedHost)
+ } else if gatewayAddr != "" {
+ // Try to parse the gateway address to extract hostname and port
+ parsedHost, gatewayPort, err := net.SplitHostPort(gatewayAddr)
+ if err == nil {
+ // Successfully parsed host:port
if gatewayPortInt, err := strconv.Atoi(gatewayPort); err == nil {
- port = gatewayPortInt // Only use the port, not the host
+ port = gatewayPortInt
+ }
+ // Use the parsed host if it's not 0.0.0.0 or empty
+ if parsedHost != "" && parsedHost != "0.0.0.0" {
+ host = parsedHost
+ glog.V(2).Infof("Using host from gatewayAddr: %s", host)
+ } else {
+ // Fall back to localhost for 0.0.0.0 or ambiguous addresses
+ host = "localhost"
+ glog.V(2).Infof("gatewayAddr is 0.0.0.0, using localhost for client advertising")
+ }
+ } else {
+ // Could not parse, use as-is if it looks like a hostname
+ if gatewayAddr != "" && gatewayAddr != "0.0.0.0" {
+ host = gatewayAddr
+ glog.V(2).Infof("Using gatewayAddr directly as host (unparseable): %s", host)
}
}
- }
-
- // Override with environment variable if set, otherwise always use localhost for external clients
- if advertisedHost := os.Getenv("KAFKA_ADVERTISED_HOST"); advertisedHost != "" {
- host = advertisedHost
} else {
+ // No gateway address and no environment variable
host = "localhost"
+ glog.V(2).Infof("No gatewayAddr provided, using localhost")
}
return host, port
}
+// generateNodeID generates a deterministic node ID from a gateway address.
+// This must match the logic in gateway/coordinator_registry.go to ensure consistency
+// between Metadata and FindCoordinator responses.
+func generateNodeID(gatewayAddress string) int32 {
+ if gatewayAddress == "" {
+ return 1 // Default fallback
+ }
+ h := fnv.New32a()
+ _, _ = h.Write([]byte(gatewayAddress))
+ // Use only positive values and avoid 0
+ return int32(h.Sum32()&0x7fffffff) + 1
+}
+
+// GetNodeID returns the consistent node ID for this gateway.
+// This is used by both Metadata and FindCoordinator handlers to ensure
+// clients see the same broker/coordinator node ID across all APIs.
+func (h *Handler) GetNodeID() int32 {
+ gatewayAddr := h.GetGatewayAddress()
+ return generateNodeID(gatewayAddr)
+}
+
// TopicInfo holds basic information about a topic
type TopicInfo struct {
Name string
@@ -131,9 +172,10 @@ type SeaweedMQHandlerInterface interface {
CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error
DeleteTopic(topic string) error
GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool)
+ InvalidateTopicExistsCache(topic string)
// Ledger methods REMOVED - SMQ handles Kafka offsets natively
- ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error)
- ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
+ ProduceRecord(ctx context.Context, topicName string, partitionID int32, key, value []byte) (int64, error)
+ ProduceRecordValue(ctx context.Context, topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error)
// GetStoredRecords retrieves records from SMQ storage (optional - for advanced implementations)
// ctx is used to control the fetch timeout (should match Kafka fetch request's MaxWaitTime)
GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error)
@@ -206,11 +248,6 @@ func (h *Handler) getTopicSchemaFormat(topic string) string {
return "" // Empty string means schemaless or format unknown
}
-// stringPtr returns a pointer to the given string
-func stringPtr(s string) *string {
- return &s
-}
-
// Handler processes Kafka protocol requests from clients using SeaweedMQ
type Handler struct {
// SeaweedMQ integration
@@ -244,6 +281,11 @@ type Handler struct {
registeredSchemas map[string]bool // key: "topic:schemaID" or "topic-key:schemaID"
registeredSchemasMu sync.RWMutex
+ // RecordType inference cache to avoid recreating Avro codecs (37% CPU overhead!)
+ // Key: schema content hash or schema string
+ inferredRecordTypes map[string]*schema_pb.RecordType
+ inferredRecordTypesMu sync.RWMutex
+
filerClient filer_pb.SeaweedFilerClient
// SMQ broker addresses discovered from masters for Metadata responses
@@ -285,6 +327,7 @@ func NewTestHandlerWithMock(mockHandler SeaweedMQHandlerInterface) *Handler {
groupCoordinator: consumer.NewGroupCoordinator(),
registeredSchemas: make(map[string]bool),
topicSchemaConfigs: make(map[string]*TopicSchemaConfig),
+ inferredRecordTypes: make(map[string]*schema_pb.RecordType),
defaultPartitions: 1,
}
}
@@ -330,6 +373,8 @@ func NewSeaweedMQBrokerHandlerWithDefaults(masters string, filerGroup string, cl
groupCoordinator: consumer.NewGroupCoordinator(),
smqBrokerAddresses: nil, // Will be set by SetSMQBrokerAddresses() when server starts
registeredSchemas: make(map[string]bool),
+ topicSchemaConfigs: make(map[string]*TopicSchemaConfig),
+ inferredRecordTypes: make(map[string]*schema_pb.RecordType),
defaultPartitions: defaultPartitions,
metadataCache: metadataCache,
coordinatorCache: coordinatorCache,
@@ -365,7 +410,7 @@ func (h *Handler) Close() error {
// Close broker client if present
if h.brokerClient != nil {
if err := h.brokerClient.Close(); err != nil {
- Warning("Failed to close broker client: %v", err)
+ glog.Warningf("Failed to close broker client: %v", err)
}
}
@@ -376,17 +421,6 @@ func (h *Handler) Close() error {
return nil
}
-// StoreRecordBatch stores a record batch for later retrieval during Fetch operations
-func (h *Handler) StoreRecordBatch(topicName string, partition int32, baseOffset int64, recordBatch []byte) {
- // Record batch storage is now handled by the SeaweedMQ handler
-}
-
-// GetRecordBatch retrieves a stored record batch that contains the requested offset
-func (h *Handler) GetRecordBatch(topicName string, partition int32, offset int64) ([]byte, bool) {
- // Record batch retrieval is now handled by the SeaweedMQ handler
- return nil, false
-}
-
// SetSMQBrokerAddresses updates the SMQ broker addresses used in Metadata responses
func (h *Handler) SetSMQBrokerAddresses(brokerAddresses []string) {
h.smqBrokerAddresses = brokerAddresses
@@ -406,8 +440,9 @@ func (h *Handler) GetSMQBrokerAddresses() []string {
return h.smqBrokerAddresses
}
- // Final fallback for testing
- return []string{"localhost:17777"}
+ // No brokers configured - return empty slice
+ // This will cause proper error handling in callers
+ return []string{}
}
// GetGatewayAddress returns the current gateway address as a string (for coordinator registry)
@@ -415,8 +450,9 @@ func (h *Handler) GetGatewayAddress() string {
if h.gatewayAddress != "" {
return h.gatewayAddress
}
- // Fallback for testing
- return "localhost:9092"
+ // No gateway address configured - return empty string
+ // Callers should handle this as a configuration error
+ return ""
}
// SetGatewayAddress sets the gateway address for coordinator registry
@@ -491,7 +527,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- // CRITICAL: Create per-connection BrokerClient for isolated gRPC streams
+ // Create per-connection BrokerClient for isolated gRPC streams
// This prevents different connections from interfering with each other's Fetch requests
// In mock/unit test mode, this may not be available, so we continue without it
var connBrokerClient *integration.BrokerClient
@@ -519,7 +555,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Close the per-connection broker client
if connBrokerClient != nil {
if closeErr := connBrokerClient.Close(); closeErr != nil {
- Error("[%s] Error closing BrokerClient: %v", connectionID, closeErr)
+ glog.Errorf("[%s] Error closing BrokerClient: %v", connectionID, closeErr)
}
}
// Remove connection context from map
@@ -539,7 +575,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
consecutiveTimeouts := 0
const maxConsecutiveTimeouts = 3 // Give up after 3 timeouts in a row
- // CRITICAL: Separate control plane from data plane
+ // Separate control plane from data plane
// Control plane: Metadata, Heartbeat, JoinGroup, etc. (must be fast, never block)
// Data plane: Fetch, Produce (can be slow, may block on I/O)
//
@@ -554,7 +590,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
var wg sync.WaitGroup
// Response writer - maintains request/response order per connection
- // CRITICAL: While we process requests concurrently (control/data plane),
+ // While we process requests concurrently (control/data plane),
// we MUST track the order requests arrive and send responses in that same order.
// Solution: Track received correlation IDs in a queue, send responses in that queue order.
correlationQueue := make([]uint32, 0, 100)
@@ -575,7 +611,8 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// responseChan closed, exit
return
}
- glog.V(2).Infof("[%s] Response writer received correlation=%d from responseChan", connectionID, resp.correlationID)
+ // Only log at V(3) for debugging, not V(4) in hot path
+ glog.V(3).Infof("[%s] Response writer received correlation=%d", connectionID, resp.correlationID)
correlationQueueMu.Lock()
pendingResponses[resp.correlationID] = resp
@@ -585,22 +622,18 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
readyResp, exists := pendingResponses[expectedID]
if !exists {
// Response not ready yet, stop sending
- glog.V(3).Infof("[%s] Response writer: waiting for correlation=%d (nextToSend=%d, queueLen=%d)", connectionID, expectedID, nextToSend, len(correlationQueue))
break
}
// Send this response
if readyResp.err != nil {
- Error("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err)
+ glog.Errorf("[%s] Error processing correlation=%d: %v", connectionID, readyResp.correlationID, readyResp.err)
} else {
- glog.V(2).Infof("[%s] Response writer: about to write correlation=%d (%d bytes)", connectionID, readyResp.correlationID, len(readyResp.response))
if writeErr := h.writeResponseWithHeader(w, readyResp.correlationID, readyResp.apiKey, readyResp.apiVersion, readyResp.response, timeoutConfig.WriteTimeout); writeErr != nil {
- glog.Errorf("[%s] Response writer: WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr)
- Error("[%s] Write error correlation=%d: %v", connectionID, readyResp.correlationID, writeErr)
+ glog.Errorf("[%s] Response writer WRITE ERROR correlation=%d: %v - EXITING", connectionID, readyResp.correlationID, writeErr)
correlationQueueMu.Unlock()
return
}
- glog.V(2).Infof("[%s] Response writer: successfully wrote correlation=%d", connectionID, readyResp.correlationID)
}
// Remove from pending and advance
@@ -627,9 +660,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Channel closed, exit
return
}
- glog.V(2).Infof("[%s] Control plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey)
-
- // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks
+ // Removed V(4) logging from hot path - only log errors and important events
+
+ // Wrap request processing with panic recovery to prevent deadlocks
// If processRequestSync panics, we MUST still send a response to avoid blocking the response writer
var response []byte
var err error
@@ -643,7 +676,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
response, err = h.processRequestSync(req)
}()
- glog.V(2).Infof("[%s] Control plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID)
select {
case responseChan <- &kafkaResponse{
correlationID: req.correlationID,
@@ -652,12 +684,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
response: response,
err: err,
}:
- glog.V(2).Infof("[%s] Control plane sent correlation=%d to responseChan", connectionID, req.correlationID)
+ // Response sent successfully - no logging here
case <-ctx.Done():
// Connection closed, stop processing
return
case <-time.After(5 * time.Second):
- glog.Errorf("[%s] DEADLOCK: Control plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID)
+ glog.Warningf("[%s] Control plane: timeout sending response correlation=%d", connectionID, req.correlationID)
}
case <-ctx.Done():
// Context cancelled, drain remaining requests before exiting
@@ -686,7 +718,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
default:
// Channel empty, safe to exit
- glog.V(2).Infof("[%s] Control plane: drain complete, exiting", connectionID)
+ glog.V(4).Infof("[%s] Control plane: drain complete, exiting", connectionID)
return
}
}
@@ -705,9 +737,9 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Channel closed, exit
return
}
- glog.V(2).Infof("[%s] Data plane processing correlation=%d, apiKey=%d", connectionID, req.correlationID, req.apiKey)
+ // Removed V(4) logging from hot path - only log errors and important events
- // CRITICAL: Wrap request processing with panic recovery to prevent deadlocks
+ // Wrap request processing with panic recovery to prevent deadlocks
// If processRequestSync panics, we MUST still send a response to avoid blocking the response writer
var response []byte
var err error
@@ -721,7 +753,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
response, err = h.processRequestSync(req)
}()
- glog.V(2).Infof("[%s] Data plane completed correlation=%d, sending to responseChan", connectionID, req.correlationID)
// Use select with context to avoid sending on closed channel
select {
case responseChan <- &kafkaResponse{
@@ -731,12 +762,12 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
response: response,
err: err,
}:
- glog.V(2).Infof("[%s] Data plane sent correlation=%d to responseChan", connectionID, req.correlationID)
+ // Response sent successfully - no logging here
case <-ctx.Done():
// Connection closed, stop processing
return
case <-time.After(5 * time.Second):
- glog.Errorf("[%s] DEADLOCK: Data plane timeout sending correlation=%d to responseChan (buffer full?)", connectionID, req.correlationID)
+ glog.Warningf("[%s] Data plane: timeout sending response correlation=%d", connectionID, req.correlationID)
}
case <-ctx.Done():
// Context cancelled, drain remaining requests before exiting
@@ -748,7 +779,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
return
}
// Process remaining requests with a short timeout
- glog.V(3).Infof("[%s] Data plane: processing drained request correlation=%d", connectionID, req.correlationID)
response, err := h.processRequestSync(req)
select {
case responseChan <- &kafkaResponse{
@@ -758,7 +788,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
response: response,
err: err,
}:
- glog.V(3).Infof("[%s] Data plane: sent drained response correlation=%d", connectionID, req.correlationID)
+ // Response sent - no logging
case <-time.After(1 * time.Second):
glog.Warningf("[%s] Data plane: timeout sending drained response correlation=%d, discarding", connectionID, req.correlationID)
return
@@ -774,7 +804,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}()
defer func() {
- // CRITICAL: Close channels in correct order to avoid panics
+ // Close channels in correct order to avoid panics
// 1. Close input channels to stop accepting new requests
close(controlChan)
close(dataChan)
@@ -785,48 +815,27 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}()
for {
- // Check if context is cancelled
+ // OPTIMIZATION: Consolidated context/deadline check - avoid redundant select statements
+ // Check context once at the beginning of the loop
select {
case <-ctx.Done():
return ctx.Err()
default:
}
- // Set a read deadline for the connection based on context or default timeout
+ // Set read deadline based on context or default timeout
+ // OPTIMIZATION: Calculate deadline once per iteration, not multiple times
var readDeadline time.Time
- var timeoutDuration time.Duration
-
if deadline, ok := ctx.Deadline(); ok {
readDeadline = deadline
- timeoutDuration = time.Until(deadline)
} else {
- // Use configurable read timeout instead of hardcoded 5 seconds
- timeoutDuration = timeoutConfig.ReadTimeout
- readDeadline = time.Now().Add(timeoutDuration)
+ readDeadline = time.Now().Add(timeoutConfig.ReadTimeout)
}
if err := conn.SetReadDeadline(readDeadline); err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
- // Check context before reading
- select {
- case <-ctx.Done():
- // Give a small delay to ensure proper cleanup
- time.Sleep(100 * time.Millisecond)
- return ctx.Err()
- default:
- // If context is close to being cancelled, set a very short timeout
- if deadline, ok := ctx.Deadline(); ok {
- timeUntilDeadline := time.Until(deadline)
- if timeUntilDeadline < 2*time.Second && timeUntilDeadline > 0 {
- shortDeadline := time.Now().Add(500 * time.Millisecond)
- if err := conn.SetReadDeadline(shortDeadline); err == nil {
- }
- }
- }
- }
-
// Read message size (4 bytes)
var sizeBytes [4]byte
if _, err := io.ReadFull(r, sizeBytes[:]); err != nil {
@@ -834,9 +843,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
return nil
}
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- // CRITICAL FIX: Track consecutive timeouts to detect CLOSE_WAIT connections
- // When remote peer closes, connection enters CLOSE_WAIT and reads keep timing out
- // After several consecutive timeouts with no data, assume connection is dead
+ // Track consecutive timeouts to detect stale connections
consecutiveTimeouts++
if consecutiveTimeouts >= maxConsecutiveTimeouts {
return nil
@@ -852,7 +859,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Successfully read the message size
size := binary.BigEndian.Uint32(sizeBytes[:])
- // Debug("Read message size: %d bytes", size)
if size == 0 || size > 1024*1024 { // 1MB limit
// Use standardized error for message size limit
// Send error response for message too large
@@ -867,12 +873,15 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
// Read the message
- messageBuf := make([]byte, size)
+ // OPTIMIZATION: Use buffer pool to reduce GC pressure (was 1MB/sec at 1000 req/s)
+ messageBuf := mem.Allocate(int(size))
+ defer mem.Free(messageBuf)
if _, err := io.ReadFull(r, messageBuf); err != nil {
_ = HandleTimeoutError(err, "read") // errorCode
return fmt.Errorf("read message: %w", err)
}
+
// Parse at least the basic header to get API key and correlation ID
if len(messageBuf) < 8 {
return fmt.Errorf("message too short")
@@ -881,9 +890,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
apiKey := binary.BigEndian.Uint16(messageBuf[0:2])
apiVersion := binary.BigEndian.Uint16(messageBuf[2:4])
correlationID := binary.BigEndian.Uint32(messageBuf[4:8])
-
- // Debug("Parsed header - API Key: %d (%s), Version: %d, Correlation: %d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
-
+
// Validate API version against what we support
if err := h.validateAPIVersion(apiKey, apiVersion); err != nil {
glog.Errorf("API VERSION VALIDATION FAILED: Key=%d (%s), Version=%d, error=%v", apiKey, getAPIName(APIKey(apiKey)), apiVersion, err)
@@ -892,8 +899,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
if writeErr != nil {
return fmt.Errorf("build error response: %w", writeErr)
}
- // CRITICAL: Send error response through response queue to maintain sequential ordering
- // This prevents deadlocks in the response writer which expects all correlation IDs in sequence
+ // Send error response through response queue to maintain sequential ordering
select {
case responseChan <- &kafkaResponse{
correlationID: correlationID,
@@ -909,10 +915,6 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
}
- // CRITICAL DEBUG: Log that validation passed
- glog.V(4).Infof("API VERSION VALIDATION PASSED: Key=%d (%s), Version=%d, Correlation=%d - proceeding to header parsing",
- apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID)
-
// Extract request body - special handling for ApiVersions requests
var requestBody []byte
if apiKey == uint16(APIKeyApiVersions) && apiVersion >= 3 {
@@ -951,29 +953,25 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Parse header using flexible version utilities for other APIs
header, parsedRequestBody, parseErr := ParseRequestHeader(messageBuf)
if parseErr != nil {
- // CRITICAL: Log the parsing error for debugging
- glog.Errorf("REQUEST HEADER PARSING FAILED: API=%d (%s) v%d, correlation=%d, error=%v, msgLen=%d",
- apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr, len(messageBuf))
+ glog.Errorf("Request header parsing failed: API=%d (%s) v%d, correlation=%d, error=%v",
+ apiKey, getAPIName(APIKey(apiKey)), apiVersion, correlationID, parseErr)
// Fall back to basic header parsing if flexible version parsing fails
// Basic header parsing fallback (original logic)
bodyOffset := 8
if len(messageBuf) < bodyOffset+2 {
- glog.Errorf("FALLBACK PARSING FAILED: missing client_id length, msgLen=%d", len(messageBuf))
return fmt.Errorf("invalid header: missing client_id length")
}
clientIDLen := int16(binary.BigEndian.Uint16(messageBuf[bodyOffset : bodyOffset+2]))
bodyOffset += 2
if clientIDLen >= 0 {
if len(messageBuf) < bodyOffset+int(clientIDLen) {
- glog.Errorf("FALLBACK PARSING FAILED: client_id truncated, clientIDLen=%d, msgLen=%d", clientIDLen, len(messageBuf))
return fmt.Errorf("invalid header: client_id truncated")
}
bodyOffset += int(clientIDLen)
}
requestBody = messageBuf[bodyOffset:]
- glog.V(2).Infof("FALLBACK PARSING SUCCESS: API=%d (%s) v%d, bodyLen=%d", apiKey, getAPIName(APIKey(apiKey)), apiVersion, len(requestBody))
} else {
// Use the successfully parsed request body
requestBody = parsedRequestBody
@@ -1001,7 +999,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
}
}
- // CRITICAL: Route request to appropriate processor
+ // Route request to appropriate processor
// Control plane: Fast, never blocks (Metadata, Heartbeat, etc.)
// Data plane: Can be slow (Fetch, Produce)
@@ -1019,13 +1017,15 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
// Route to appropriate channel based on API key
var targetChan chan *kafkaRequest
+ if apiKey == 2 { // ListOffsets
+ }
if isDataPlaneAPI(apiKey) {
targetChan = dataChan
} else {
targetChan = controlChan
}
- // CRITICAL: Only add to correlation queue AFTER successful channel send
+ // Only add to correlation queue AFTER successful channel send
// If we add before and the channel blocks, the correlation ID is in the queue
// but the request never gets processed, causing response writer deadlock
select {
@@ -1038,7 +1038,7 @@ func (h *Handler) HandleConn(ctx context.Context, conn net.Conn) error {
return ctx.Err()
case <-time.After(10 * time.Second):
// Channel full for too long - this shouldn't happen with proper backpressure
- glog.Errorf("[%s] CRITICAL: Failed to queue correlation=%d after 10s timeout - channel full!", connectionID, correlationID)
+ glog.Errorf("[%s] Failed to queue correlation=%d - channel full (10s timeout)", connectionID, correlationID)
return fmt.Errorf("request queue full: correlation=%d", correlationID)
}
}
@@ -1050,6 +1050,13 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
requestStart := time.Now()
apiName := getAPIName(APIKey(req.apiKey))
+
+ // Only log high-volume requests at V(2), not V(4)
+ if glog.V(2) {
+ glog.V(2).Infof("[API] %s (key=%d, ver=%d, corr=%d)",
+ apiName, req.apiKey, req.apiVersion, req.correlationID)
+ }
+
var response []byte
var err error
@@ -1070,7 +1077,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
response, err = h.handleDeleteTopics(req.correlationID, req.requestBody)
case APIKeyProduce:
- response, err = h.handleProduce(req.correlationID, req.apiVersion, req.requestBody)
+ response, err = h.handleProduce(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
case APIKeyFetch:
response, err = h.handleFetch(req.ctx, req.correlationID, req.apiVersion, req.requestBody)
@@ -1112,7 +1119,7 @@ func (h *Handler) processRequestSync(req *kafkaRequest) ([]byte, error) {
response, err = h.handleInitProducerId(req.correlationID, req.apiVersion, req.requestBody)
default:
- Warning("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID)
+ glog.Warningf("Unsupported API key: %d (%s) v%d - Correlation: %d", req.apiKey, apiName, req.apiVersion, req.correlationID)
err = fmt.Errorf("unsupported API key: %d (version %d)", req.apiKey, req.apiVersion)
}
@@ -1171,7 +1178,7 @@ func (h *Handler) handleApiVersions(correlationID uint32, apiVersion uint16) ([]
// Error code (2 bytes) - always fixed-length
response = append(response, 0, 0) // No error
- // API Keys Array - CRITICAL FIX: Use correct encoding based on version
+ // API Keys Array - use correct encoding based on version
if apiVersion >= 3 {
// FLEXIBLE FORMAT: Compact array with varint length - THIS FIXES THE ADMINCLIENT BUG!
response = append(response, CompactArrayLength(uint32(len(SupportedApiKeys)))...)
@@ -1221,11 +1228,16 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
+ // Get consistent node ID for this gateway
+ nodeID := h.GetNodeID()
+ nodeIDBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(nodeIDBytes, uint32(nodeID))
+
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4)
- response = append(response, 0, 0, 0, 1) // node_id = 1 (consistent with partitions)
+ response = append(response, nodeIDBytes...) // Use consistent node ID
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
@@ -1248,7 +1260,7 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
- glog.V(0).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics)
+ glog.V(3).Infof("[METADATA v0] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@@ -1258,6 +1270,26 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
+ } else {
+ // Topic doesn't exist according to current cache, check broker directly
+ // This handles the race condition where producers just created topics
+ // and consumers are requesting metadata before cache TTL expires
+ glog.V(3).Infof("[METADATA v0] Topic %s not in cache, checking broker directly", name)
+ h.seaweedMQHandler.InvalidateTopicExistsCache(name)
+ if h.seaweedMQHandler.TopicExists(name) {
+ glog.V(3).Infof("[METADATA v0] Topic %s found on broker after cache refresh", name)
+ topicsToReturn = append(topicsToReturn, name)
+ } else {
+ glog.V(3).Infof("[METADATA v0] Topic %s not found, auto-creating with default partitions", name)
+ // Auto-create topic (matches Kafka's auto.create.topics.enable=true)
+ if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil {
+ glog.V(2).Infof("[METADATA v0] Failed to auto-create topic %s: %v", name, err)
+ // Don't add to topicsToReturn - client will get error
+ } else {
+ glog.V(2).Infof("[METADATA v0] Successfully auto-created topic %s", name)
+ topicsToReturn = append(topicsToReturn, name)
+ }
+ }
}
}
}
@@ -1300,15 +1332,15 @@ func (h *Handler) HandleMetadataV0(correlationID uint32, requestBody []byte) ([]
binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID))
response = append(response, partitionIDBytes...)
- response = append(response, 0, 0, 0, 1) // leader = 1 (this broker)
+ response = append(response, nodeIDBytes...) // leader = this broker
- // replicas: array length(4) + one broker id (1)
- response = append(response, 0, 0, 0, 1)
+ // replicas: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
+ response = append(response, nodeIDBytes...)
- // isr: array length(4) + one broker id (1)
- response = append(response, 0, 0, 0, 1)
+ // isr: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
+ response = append(response, nodeIDBytes...)
}
}
@@ -1323,7 +1355,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
- glog.V(0).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics)
+ glog.V(3).Infof("[METADATA v1] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@@ -1333,6 +1365,22 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
+ } else {
+ // Topic doesn't exist according to current cache, check broker directly
+ glog.V(3).Infof("[METADATA v1] Topic %s not in cache, checking broker directly", name)
+ h.seaweedMQHandler.InvalidateTopicExistsCache(name)
+ if h.seaweedMQHandler.TopicExists(name) {
+ glog.V(3).Infof("[METADATA v1] Topic %s found on broker after cache refresh", name)
+ topicsToReturn = append(topicsToReturn, name)
+ } else {
+ glog.V(3).Infof("[METADATA v1] Topic %s not found, auto-creating with default partitions", name)
+ if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil {
+ glog.V(2).Infof("[METADATA v1] Failed to auto-create topic %s: %v", name, err)
+ } else {
+ glog.V(2).Infof("[METADATA v1] Successfully auto-created topic %s", name)
+ topicsToReturn = append(topicsToReturn, name)
+ }
+ }
}
}
}
@@ -1343,11 +1391,16 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
// NOTE: Correlation ID is handled by writeResponseWithHeader
// Do NOT include it in the response body
+ // Get consistent node ID for this gateway
+ nodeID := h.GetNodeID()
+ nodeIDBytes := make([]byte, 4)
+ binary.BigEndian.PutUint32(nodeIDBytes, uint32(nodeID))
+
// Brokers array length (4 bytes) - 1 broker (this gateway)
response = append(response, 0, 0, 0, 1)
// Broker 0: node_id(4) + host(STRING) + port(4) + rack(STRING)
- response = append(response, 0, 0, 0, 1) // node_id = 1
+ response = append(response, nodeIDBytes...) // Use consistent node ID
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
@@ -1372,7 +1425,7 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
response = append(response, 0, 0) // empty string
// ControllerID (4 bytes) - v1 addition
- response = append(response, 0, 0, 0, 1) // controller_id = 1
+ response = append(response, nodeIDBytes...) // controller_id = this broker
// Topics array length (4 bytes)
topicsCountBytes := make([]byte, 4)
@@ -1414,15 +1467,15 @@ func (h *Handler) HandleMetadataV1(correlationID uint32, requestBody []byte) ([]
binary.BigEndian.PutUint32(partitionIDBytes, uint32(partitionID))
response = append(response, partitionIDBytes...)
- response = append(response, 0, 0, 0, 1) // leader_id = 1
+ response = append(response, nodeIDBytes...) // leader_id = this broker
- // replicas: array length(4) + one broker id (1)
- response = append(response, 0, 0, 0, 1)
+ // replicas: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
+ response = append(response, nodeIDBytes...)
- // isr: array length(4) + one broker id (1)
- response = append(response, 0, 0, 0, 1)
+ // isr: array length(4) + one broker id (this broker)
response = append(response, 0, 0, 0, 1)
+ response = append(response, nodeIDBytes...)
}
}
@@ -1436,7 +1489,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
- glog.V(0).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics)
+ glog.V(3).Infof("[METADATA v2] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@@ -1446,6 +1499,22 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
+ } else {
+ // Topic doesn't exist according to current cache, check broker directly
+ glog.V(3).Infof("[METADATA v2] Topic %s not in cache, checking broker directly", name)
+ h.seaweedMQHandler.InvalidateTopicExistsCache(name)
+ if h.seaweedMQHandler.TopicExists(name) {
+ glog.V(3).Infof("[METADATA v2] Topic %s found on broker after cache refresh", name)
+ topicsToReturn = append(topicsToReturn, name)
+ } else {
+ glog.V(3).Infof("[METADATA v2] Topic %s not found, auto-creating with default partitions", name)
+ if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil {
+ glog.V(2).Infof("[METADATA v2] Failed to auto-create topic %s: %v", name, err)
+ } else {
+ glog.V(2).Infof("[METADATA v2] Successfully auto-created topic %s", name)
+ topicsToReturn = append(topicsToReturn, name)
+ }
+ }
}
}
}
@@ -1462,7 +1531,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
- nodeID := int32(1) // Single gateway node
+ nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
@@ -1490,7 +1559,7 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
buf.WriteString(clusterID)
// ControllerID (4 bytes) - v1+ addition
- binary.Write(&buf, binary.BigEndian, int32(1))
+ binary.Write(&buf, binary.BigEndian, nodeID)
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
@@ -1520,15 +1589,15 @@ func (h *Handler) HandleMetadataV2(correlationID uint32, requestBody []byte) ([]
for partitionID := int32(0); partitionID < partitionCount; partitionID++ {
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex
- binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
+ binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
- binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
+ binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
- binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
+ binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1
}
}
@@ -1544,7 +1613,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
- glog.V(0).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics)
+ glog.V(3).Infof("[METADATA v3/v4] Requested topics: %v (empty=all)", requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@@ -1554,6 +1623,22 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
for _, name := range requestedTopics {
if h.seaweedMQHandler.TopicExists(name) {
topicsToReturn = append(topicsToReturn, name)
+ } else {
+ // Topic doesn't exist according to current cache, check broker directly
+ glog.V(3).Infof("[METADATA v3/v4] Topic %s not in cache, checking broker directly", name)
+ h.seaweedMQHandler.InvalidateTopicExistsCache(name)
+ if h.seaweedMQHandler.TopicExists(name) {
+ glog.V(3).Infof("[METADATA v3/v4] Topic %s found on broker after cache refresh", name)
+ topicsToReturn = append(topicsToReturn, name)
+ } else {
+ glog.V(3).Infof("[METADATA v3/v4] Topic %s not found, auto-creating with default partitions", name)
+ if err := h.createTopicWithSchemaSupport(name, h.GetDefaultPartitions()); err != nil {
+ glog.V(2).Infof("[METADATA v3/v4] Failed to auto-create topic %s: %v", name, err)
+ } else {
+ glog.V(2).Infof("[METADATA v3/v4] Successfully auto-created topic %s", name)
+ topicsToReturn = append(topicsToReturn, name)
+ }
+ }
}
}
}
@@ -1573,7 +1658,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
- nodeID := int32(1) // Single gateway node
+ nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
@@ -1601,7 +1686,7 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
buf.WriteString(clusterID)
// ControllerID (4 bytes) - v1+ addition
- binary.Write(&buf, binary.BigEndian, int32(1))
+ binary.Write(&buf, binary.BigEndian, nodeID)
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
@@ -1631,20 +1716,28 @@ func (h *Handler) HandleMetadataV3V4(correlationID uint32, requestBody []byte) (
for partitionID := int32(0); partitionID < partitionCount; partitionID++ {
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex
- binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
+ binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
- binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
+ binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
- binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
+ binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1
}
}
response := buf.Bytes()
+ // Detailed logging for Metadata response
+ maxDisplay := len(response)
+ if maxDisplay > 50 {
+ maxDisplay = 50
+ }
+ if len(response) > 100 {
+ }
+
return response, nil
}
@@ -1655,7 +1748,7 @@ func (h *Handler) HandleMetadataV5V6(correlationID uint32, requestBody []byte) (
// HandleMetadataV7 implements Metadata API v7 with LeaderEpoch field (REGULAR FORMAT, NOT FLEXIBLE)
func (h *Handler) HandleMetadataV7(correlationID uint32, requestBody []byte) ([]byte, error) {
- // CRITICAL: Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format
+ // Metadata v7 uses REGULAR arrays/strings (like v5/v6), NOT compact format
// Only v9+ uses compact format (flexible responses)
return h.handleMetadataV5ToV8(correlationID, requestBody, 7)
}
@@ -1671,7 +1764,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// Parse requested topics (empty means all)
requestedTopics := h.parseMetadataTopics(requestBody)
- glog.V(0).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics)
+ glog.V(3).Infof("[METADATA v%d] Requested topics: %v (empty=all)", apiVersion, requestedTopics)
// Determine topics to return using SeaweedMQ handler
var topicsToReturn []string
@@ -1688,24 +1781,45 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
for _, topic := range requestedTopics {
if isSystemTopic(topic) {
// Always try to auto-create system topics during metadata requests
- glog.V(0).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic)
+ glog.V(3).Infof("[METADATA v%d] Ensuring system topic %s exists during metadata request", apiVersion, topic)
if !h.seaweedMQHandler.TopicExists(topic) {
- glog.V(0).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic)
+ glog.V(3).Infof("[METADATA v%d] Auto-creating system topic %s during metadata request", apiVersion, topic)
if err := h.createTopicWithSchemaSupport(topic, 1); err != nil {
glog.V(0).Infof("[METADATA v%d] Failed to auto-create system topic %s: %v", apiVersion, topic, err)
// Continue without adding to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION
} else {
- glog.V(0).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic)
+ glog.V(3).Infof("[METADATA v%d] Successfully auto-created system topic %s", apiVersion, topic)
}
} else {
- glog.V(0).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic)
+ glog.V(3).Infof("[METADATA v%d] System topic %s already exists", apiVersion, topic)
}
topicsToReturn = append(topicsToReturn, topic)
} else if h.seaweedMQHandler.TopicExists(topic) {
topicsToReturn = append(topicsToReturn, topic)
+ } else {
+ // Topic doesn't exist according to current cache, but let's check broker directly
+ // This handles the race condition where producers just created topics
+ // and consumers are requesting metadata before cache TTL expires
+ glog.V(3).Infof("[METADATA v%d] Topic %s not in cache, checking broker directly", apiVersion, topic)
+ // Force cache invalidation to do fresh broker check
+ h.seaweedMQHandler.InvalidateTopicExistsCache(topic)
+ if h.seaweedMQHandler.TopicExists(topic) {
+ glog.V(3).Infof("[METADATA v%d] Topic %s found on broker after cache refresh", apiVersion, topic)
+ topicsToReturn = append(topicsToReturn, topic)
+ } else {
+ glog.V(3).Infof("[METADATA v%d] Topic %s not found on broker, auto-creating with default partitions", apiVersion, topic)
+ // Auto-create non-system topics with default partitions (matches Kafka behavior)
+ if err := h.createTopicWithSchemaSupport(topic, h.GetDefaultPartitions()); err != nil {
+ glog.V(2).Infof("[METADATA v%d] Failed to auto-create topic %s: %v", apiVersion, topic, err)
+ // Don't add to topicsToReturn - client will get UNKNOWN_TOPIC_OR_PARTITION
+ } else {
+ glog.V(2).Infof("[METADATA v%d] Successfully auto-created topic %s", apiVersion, topic)
+ topicsToReturn = append(topicsToReturn, topic)
+ }
+ }
}
}
- glog.V(0).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics)
+ glog.V(3).Infof("[METADATA v%d] Returning topics: %v (requested: %v)", apiVersion, topicsToReturn, requestedTopics)
}
var buf bytes.Buffer
@@ -1714,6 +1828,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
+
// ThrottleTimeMs (4 bytes) - v3+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No throttling
@@ -1723,7 +1838,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// Get advertised address for client connections
host, port := h.GetAdvertisedAddress(h.GetGatewayAddress())
- nodeID := int32(1) // Single gateway node
+ nodeID := h.GetNodeID() // Get consistent node ID for this gateway
// Broker: node_id(4) + host(STRING) + port(4) + rack(STRING) + cluster_id(NULLABLE_STRING)
binary.Write(&buf, binary.BigEndian, nodeID)
@@ -1751,7 +1866,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
buf.WriteString(clusterID)
// ControllerID (4 bytes) - v1+ addition
- binary.Write(&buf, binary.BigEndian, int32(1))
+ binary.Write(&buf, binary.BigEndian, nodeID)
// Topics array (4 bytes length + topics)
binary.Write(&buf, binary.BigEndian, int32(len(topicsToReturn)))
@@ -1781,7 +1896,7 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
for partitionID := int32(0); partitionID < partitionCount; partitionID++ {
binary.Write(&buf, binary.BigEndian, int16(0)) // ErrorCode
binary.Write(&buf, binary.BigEndian, partitionID) // PartitionIndex
- binary.Write(&buf, binary.BigEndian, int32(1)) // LeaderID
+ binary.Write(&buf, binary.BigEndian, nodeID) // LeaderID
// LeaderEpoch (4 bytes) - v7+ addition
if apiVersion >= 7 {
@@ -1790,11 +1905,11 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
// ReplicaNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 replica
- binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
+ binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1
// IsrNodes array (4 bytes length + nodes)
binary.Write(&buf, binary.BigEndian, int32(1)) // 1 ISR node
- binary.Write(&buf, binary.BigEndian, int32(1)) // NodeID 1
+ binary.Write(&buf, binary.BigEndian, nodeID) // NodeID 1
// OfflineReplicas array (4 bytes length + nodes) - v5+ addition
binary.Write(&buf, binary.BigEndian, int32(0)) // No offline replicas
@@ -1808,6 +1923,14 @@ func (h *Handler) handleMetadataV5ToV8(correlationID uint32, requestBody []byte,
response := buf.Bytes()
+ // Detailed logging for Metadata response
+ maxDisplay := len(response)
+ if maxDisplay > 50 {
+ maxDisplay = 50
+ }
+ if len(response) > 100 {
+ }
+
return response, nil
}
@@ -1871,6 +1994,12 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
// Parse minimal request to understand what's being asked (header already stripped)
offset := 0
+
+ maxBytes := len(requestBody)
+ if maxBytes > 64 {
+ maxBytes = 64
+ }
+
// v1+ has replica_id(4)
if apiVersion >= 1 {
if len(requestBody) < offset+4 {
@@ -1974,9 +2103,7 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
responseOffset = earliestOffset
}
responseTimestamp = 0 // No specific timestamp for earliest
- if strings.HasPrefix(string(topicName), "_schemas") {
- glog.Infof("SCHEMA REGISTRY LISTOFFSETS EARLIEST: topic=%s partition=%d returning offset=%d", string(topicName), partitionID, responseOffset)
- }
+
case -1: // latest offset
// Get the actual latest offset from SMQ
if h.seaweedMQHandler == nil {
@@ -2015,13 +2142,21 @@ func (h *Handler) handleListOffsets(correlationID uint32, apiVersion uint16, req
actualTopicsCount++
}
- // CRITICAL FIX: Update the topics count in the response header with the actual count
+ // Update the topics count in the response header with the actual count
// This prevents ErrIncompleteResponse when request parsing fails mid-way
if actualTopicsCount != topicsCount {
binary.BigEndian.PutUint32(response[topicsCountOffset:topicsCountOffset+4], actualTopicsCount)
+ } else {
}
+ if len(response) > 0 {
+ respPreview := len(response)
+ if respPreview > 32 {
+ respPreview = 32
+ }
+ }
return response, nil
+
}
func (h *Handler) handleCreateTopics(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
@@ -2207,7 +2342,7 @@ func (h *Handler) handleCreateTopicsV2To4(correlationID uint32, requestBody []by
} else {
// Use schema-aware topic creation
if err := h.createTopicWithSchemaSupport(t.name, int32(t.partitions)); err != nil {
- errCode = 1 // UNKNOWN_SERVER_ERROR
+ errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
}
}
eb := make([]byte, 2)
@@ -2337,7 +2472,7 @@ func (h *Handler) handleCreateTopicsV0V1(correlationID uint32, requestBody []byt
} else {
// Create the topic in SeaweedMQ with schema support
if err := h.createTopicWithSchemaSupport(topicName, int32(numPartitions)); err != nil {
- errorCode = 1 // UNKNOWN_SERVER_ERROR
+ errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
}
}
@@ -2612,7 +2747,7 @@ func (h *Handler) handleCreateTopicsV2Plus(correlationID uint32, apiVersion uint
} else {
// Use corrected values for error checking and topic creation with schema support
if err := h.createTopicWithSchemaSupport(t.name, int32(actualPartitions)); err != nil {
- errCode = 1 // UNKNOWN_SERVER_ERROR
+ errCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
}
}
eb := make([]byte, 2)
@@ -2744,7 +2879,7 @@ func (h *Handler) handleDeleteTopics(correlationID uint32, requestBody []byte) (
} else {
// Delete the topic from SeaweedMQ
if err := h.seaweedMQHandler.DeleteTopic(topicName); err != nil {
- errorCode = 1 // UNKNOWN_SERVER_ERROR
+ errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
errorMessage = err.Error()
}
}
@@ -2809,26 +2944,36 @@ func (h *Handler) buildUnsupportedVersionResponse(correlationID uint32, apiKey,
// handleMetadata routes to the appropriate version-specific handler
func (h *Handler) handleMetadata(correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
+
+ var response []byte
+ var err error
+
switch apiVersion {
case 0:
- return h.HandleMetadataV0(correlationID, requestBody)
+ response, err = h.HandleMetadataV0(correlationID, requestBody)
case 1:
- return h.HandleMetadataV1(correlationID, requestBody)
+ response, err = h.HandleMetadataV1(correlationID, requestBody)
case 2:
- return h.HandleMetadataV2(correlationID, requestBody)
+ response, err = h.HandleMetadataV2(correlationID, requestBody)
case 3, 4:
- return h.HandleMetadataV3V4(correlationID, requestBody)
+ response, err = h.HandleMetadataV3V4(correlationID, requestBody)
case 5, 6:
- return h.HandleMetadataV5V6(correlationID, requestBody)
+ response, err = h.HandleMetadataV5V6(correlationID, requestBody)
case 7:
- return h.HandleMetadataV7(correlationID, requestBody)
+ response, err = h.HandleMetadataV7(correlationID, requestBody)
default:
// For versions > 7, use the V7 handler (flexible format)
if apiVersion > 7 {
- return h.HandleMetadataV7(correlationID, requestBody)
+ response, err = h.HandleMetadataV7(correlationID, requestBody)
+ } else {
+ err = fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}
- return nil, fmt.Errorf("metadata version %d not implemented yet", apiVersion)
}
+
+ if err != nil {
+ } else {
+ }
+ return response, err
}
// getAPIName returns a human-readable name for Kafka API keys (for debugging)
@@ -2883,7 +3028,7 @@ func (h *Handler) handleDescribeConfigs(correlationID uint32, apiVersion uint16,
// Parse request to extract resources
resources, err := h.parseDescribeConfigsRequest(requestBody, apiVersion)
if err != nil {
- Error("DescribeConfigs parsing error: %v", err)
+ glog.Errorf("DescribeConfigs parsing error: %v", err)
return nil, fmt.Errorf("failed to parse DescribeConfigs request: %w", err)
}
@@ -3049,7 +3194,7 @@ func isFlexibleResponse(apiKey uint16, apiVersion uint16) bool {
case APIKeySyncGroup:
return apiVersion >= 4
case APIKeyApiVersions:
- // CRITICAL: AdminClient compatibility requires header version 0 (no tagged fields)
+ // AdminClient compatibility requires header version 0 (no tagged fields)
// Even though ApiVersions v3+ technically supports flexible responses, AdminClient
// expects the header to NOT include tagged fields. This is a known quirk.
return false // Always use non-flexible header for ApiVersions
@@ -3122,40 +3267,6 @@ func (h *Handler) writeResponseWithHeader(w *bufio.Writer, correlationID uint32,
return nil
}
-// hexDump formats bytes as a hex dump with ASCII representation
-func hexDump(data []byte) string {
- var result strings.Builder
- for i := 0; i < len(data); i += 16 {
- // Offset
- result.WriteString(fmt.Sprintf("%04x ", i))
-
- // Hex bytes
- for j := 0; j < 16; j++ {
- if i+j < len(data) {
- result.WriteString(fmt.Sprintf("%02x ", data[i+j]))
- } else {
- result.WriteString(" ")
- }
- if j == 7 {
- result.WriteString(" ")
- }
- }
-
- // ASCII representation
- result.WriteString(" |")
- for j := 0; j < 16 && i+j < len(data); j++ {
- b := data[i+j]
- if b >= 32 && b < 127 {
- result.WriteByte(b)
- } else {
- result.WriteByte('.')
- }
- }
- result.WriteString("|\n")
- }
- return result.String()
-}
-
// writeResponseWithCorrelationID is deprecated - use writeResponseWithHeader instead
// Kept for compatibility with direct callers that don't have API info
func (h *Handler) writeResponseWithCorrelationID(w *bufio.Writer, correlationID uint32, responseBody []byte, timeout time.Duration) error {
@@ -3319,12 +3430,6 @@ func (h *Handler) parseDescribeConfigsRequest(requestBody []byte, apiVersion uin
var resourcesLength uint32
if isFlexible {
- // Debug: log the first 8 bytes of the request body
- debugBytes := requestBody[offset:]
- if len(debugBytes) > 8 {
- debugBytes = debugBytes[:8]
- }
-
// FIX: Skip top-level tagged fields for DescribeConfigs v4+ flexible protocol
// The request body starts with tagged fields count (usually 0x00 = empty)
_, consumed, err := DecodeTaggedFields(requestBody[offset:])
@@ -3825,6 +3930,12 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16,
// v2+: transactional_id(NULLABLE_STRING) + transaction_timeout_ms(INT32) + producer_id(INT64) + producer_epoch(INT16)
// v4+: Uses flexible format with tagged fields
+
+ maxBytes := len(requestBody)
+ if maxBytes > 64 {
+ maxBytes = 64
+ }
+
offset := 0
// Parse transactional_id (NULLABLE_STRING or COMPACT_NULLABLE_STRING for flexible versions)
@@ -3930,6 +4041,10 @@ func (h *Handler) handleInitProducerId(correlationID uint32, apiVersion uint16,
response = append(response, 0x00) // Empty response body tagged fields
}
+ respPreview := len(response)
+ if respPreview > 32 {
+ respPreview = 32
+ }
return response, nil
}
@@ -3982,11 +4097,10 @@ func (h *Handler) createTopicWithSchemaSupport(topicName string, partitions int3
// createTopicWithDefaultFlexibleSchema creates a topic with a flexible default schema
// that can handle both Avro and JSON messages when schema management is enabled
func (h *Handler) createTopicWithDefaultFlexibleSchema(topicName string, partitions int32) error {
- // CRITICAL FIX: System topics like _schemas should be PLAIN Kafka topics without schema management
+ // System topics like _schemas should be PLAIN Kafka topics without schema management
// Schema Registry uses _schemas to STORE schemas, so it can't have schema management itself
- // This was causing issues with Schema Registry bootstrap
- glog.V(0).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName)
+ glog.V(1).Infof("Creating system topic %s as PLAIN topic (no schema management)", topicName)
return h.seaweedMQHandler.CreateTopic(topicName, partitions)
}
@@ -4191,5 +4305,5 @@ func cleanupPartitionReaders(connCtx *ConnectionContext) {
return true // Continue iteration
})
- glog.V(2).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID)
+ glog.V(4).Infof("[%s] Cleaned up partition readers", connCtx.ConnectionID)
}