aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/gateway
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/gateway')
-rw-r--r--weed/mq/kafka/gateway/coordinator_registry.go805
-rw-r--r--weed/mq/kafka/gateway/coordinator_registry_test.go309
-rw-r--r--weed/mq/kafka/gateway/server.go300
-rw-r--r--weed/mq/kafka/gateway/test_mock_handler.go224
4 files changed, 1638 insertions, 0 deletions
diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go
new file mode 100644
index 000000000..af3330b03
--- /dev/null
+++ b/weed/mq/kafka/gateway/coordinator_registry.go
@@ -0,0 +1,805 @@
+package gateway
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "hash/fnv"
+ "io"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "google.golang.org/grpc"
+)
+
+// CoordinatorRegistry manages consumer group coordinator assignments
+// Only the gateway leader maintains this registry
+type CoordinatorRegistry struct {
+ // Leader election
+ leaderLock *cluster.LiveLock
+ isLeader bool
+ leaderMutex sync.RWMutex
+ leadershipChange chan string // Notifies when leadership changes
+
+ // No in-memory assignments - read/write directly to filer
+ // assignmentsMutex still needed for coordinating file operations
+ assignmentsMutex sync.RWMutex
+
+ // Gateway registry
+ activeGateways map[string]*GatewayInfo // gatewayAddress -> info
+ gatewaysMutex sync.RWMutex
+
+ // Configuration
+ gatewayAddress string
+ lockClient *cluster.LockClient
+ filerClientAccessor *filer_client.FilerClientAccessor
+ filerDiscoveryService *filer_client.FilerDiscoveryService
+
+ // Control
+ stopChan chan struct{}
+ wg sync.WaitGroup
+}
+
+// Remove local CoordinatorAssignment - use protocol.CoordinatorAssignment instead
+
+// GatewayInfo represents an active gateway instance
+type GatewayInfo struct {
+ Address string
+ NodeID int32
+ RegisteredAt time.Time
+ LastHeartbeat time.Time
+ IsHealthy bool
+}
+
+const (
+ GatewayLeaderLockKey = "kafka-gateway-leader"
+ HeartbeatInterval = 10 * time.Second
+ GatewayTimeout = 30 * time.Second
+
+ // Filer paths for coordinator assignment persistence
+ CoordinatorAssignmentsDir = "/topics/kafka/.meta/coordinators"
+)
+
+// NewCoordinatorRegistry creates a new coordinator registry
+func NewCoordinatorRegistry(gatewayAddress string, masters []pb.ServerAddress, grpcDialOption grpc.DialOption) *CoordinatorRegistry {
+ // Create filer discovery service that will periodically refresh filers from all masters
+ filerDiscoveryService := filer_client.NewFilerDiscoveryService(masters, grpcDialOption)
+
+ // Manually discover filers from each master until we find one
+ var seedFiler pb.ServerAddress
+ for _, master := range masters {
+ // Use the same discovery logic as filer_discovery.go
+ grpcAddr := master.ToGrpcAddress()
+ conn, err := grpc.Dial(grpcAddr, grpcDialOption)
+ if err != nil {
+ continue
+ }
+
+ client := master_pb.NewSeaweedClient(conn)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
+ ClientType: cluster.FilerType,
+ })
+ cancel()
+ conn.Close()
+
+ if err == nil && len(resp.ClusterNodes) > 0 {
+ // Found a filer - use its HTTP address (WithFilerClient will convert to gRPC automatically)
+ seedFiler = pb.ServerAddress(resp.ClusterNodes[0].Address)
+ glog.V(1).Infof("Using filer %s as seed for distributed locking (discovered from master %s)", seedFiler, master)
+ break
+ }
+ }
+
+ lockClient := cluster.NewLockClient(grpcDialOption, seedFiler)
+
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: gatewayAddress,
+ lockClient: lockClient,
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10), // Buffered channel for leadership notifications
+ filerDiscoveryService: filerDiscoveryService,
+ }
+
+ // Create filer client accessor that uses dynamic filer discovery
+ registry.filerClientAccessor = &filer_client.FilerClientAccessor{
+ GetGrpcDialOption: func() grpc.DialOption {
+ return grpcDialOption
+ },
+ GetFilers: func() []pb.ServerAddress {
+ return registry.filerDiscoveryService.GetFilers()
+ },
+ }
+
+ return registry
+}
+
+// Start begins the coordinator registry operations
+func (cr *CoordinatorRegistry) Start() error {
+ glog.V(1).Infof("Starting coordinator registry for gateway %s", cr.gatewayAddress)
+
+ // Start filer discovery service first
+ if err := cr.filerDiscoveryService.Start(); err != nil {
+ return fmt.Errorf("failed to start filer discovery service: %w", err)
+ }
+
+ // Start leader election
+ cr.startLeaderElection()
+
+ // Start heartbeat loop to keep this gateway healthy
+ cr.startHeartbeatLoop()
+
+ // Start cleanup goroutine
+ cr.startCleanupLoop()
+
+ // Register this gateway
+ cr.registerGateway(cr.gatewayAddress)
+
+ return nil
+}
+
+// Stop shuts down the coordinator registry
+func (cr *CoordinatorRegistry) Stop() error {
+ glog.V(1).Infof("Stopping coordinator registry for gateway %s", cr.gatewayAddress)
+
+ close(cr.stopChan)
+ cr.wg.Wait()
+
+ // Release leader lock if held
+ if cr.leaderLock != nil {
+ cr.leaderLock.Stop()
+ }
+
+ // Stop filer discovery service
+ if err := cr.filerDiscoveryService.Stop(); err != nil {
+ glog.Warningf("Failed to stop filer discovery service: %v", err)
+ }
+
+ return nil
+}
+
+// startLeaderElection starts the leader election process
+func (cr *CoordinatorRegistry) startLeaderElection() {
+ cr.wg.Add(1)
+ go func() {
+ defer cr.wg.Done()
+
+ // Start long-lived lock for leader election
+ cr.leaderLock = cr.lockClient.StartLongLivedLock(
+ GatewayLeaderLockKey,
+ cr.gatewayAddress,
+ cr.onLeadershipChange,
+ )
+
+ // Wait for shutdown
+ <-cr.stopChan
+
+ // The leader lock will be stopped when Stop() is called
+ }()
+}
+
+// onLeadershipChange handles leadership changes
+func (cr *CoordinatorRegistry) onLeadershipChange(newLeader string) {
+ cr.leaderMutex.Lock()
+ defer cr.leaderMutex.Unlock()
+
+ wasLeader := cr.isLeader
+ cr.isLeader = (newLeader == cr.gatewayAddress)
+
+ if cr.isLeader && !wasLeader {
+ glog.V(0).Infof("Gateway %s became the coordinator registry leader", cr.gatewayAddress)
+ cr.onBecameLeader()
+ } else if !cr.isLeader && wasLeader {
+ glog.V(0).Infof("Gateway %s lost coordinator registry leadership to %s", cr.gatewayAddress, newLeader)
+ cr.onLostLeadership()
+ }
+
+ // Notify waiting goroutines about leadership change
+ select {
+ case cr.leadershipChange <- newLeader:
+ // Notification sent
+ default:
+ // Channel full, skip notification (shouldn't happen with buffered channel)
+ }
+}
+
+// onBecameLeader handles becoming the leader
+func (cr *CoordinatorRegistry) onBecameLeader() {
+ // Assignments are now read directly from files - no need to load into memory
+ glog.V(1).Info("Leader election complete - coordinator assignments will be read from filer as needed")
+
+ // Clear gateway registry since it's ephemeral (gateways need to re-register)
+ cr.gatewaysMutex.Lock()
+ cr.activeGateways = make(map[string]*GatewayInfo)
+ cr.gatewaysMutex.Unlock()
+
+ // Re-register this gateway
+ cr.registerGateway(cr.gatewayAddress)
+}
+
+// onLostLeadership handles losing leadership
+func (cr *CoordinatorRegistry) onLostLeadership() {
+ // No in-memory assignments to clear - assignments are stored in filer
+ glog.V(1).Info("Lost leadership - no longer managing coordinator assignments")
+}
+
+// IsLeader returns whether this gateway is the coordinator registry leader
+func (cr *CoordinatorRegistry) IsLeader() bool {
+ cr.leaderMutex.RLock()
+ defer cr.leaderMutex.RUnlock()
+ return cr.isLeader
+}
+
+// GetLeaderAddress returns the current leader's address
+func (cr *CoordinatorRegistry) GetLeaderAddress() string {
+ if cr.leaderLock != nil {
+ return cr.leaderLock.LockOwner()
+ }
+ return ""
+}
+
+// WaitForLeader waits for a leader to be elected, with timeout
+func (cr *CoordinatorRegistry) WaitForLeader(timeout time.Duration) (string, error) {
+ // Check if there's already a leader
+ if leader := cr.GetLeaderAddress(); leader != "" {
+ return leader, nil
+ }
+
+ // Check if this instance is the leader
+ if cr.IsLeader() {
+ return cr.gatewayAddress, nil
+ }
+
+ // Wait for leadership change notification
+ deadline := time.Now().Add(timeout)
+ for {
+ select {
+ case leader := <-cr.leadershipChange:
+ if leader != "" {
+ return leader, nil
+ }
+ case <-time.After(time.Until(deadline)):
+ return "", fmt.Errorf("timeout waiting for leader election after %v", timeout)
+ }
+
+ // Double-check in case we missed a notification
+ if leader := cr.GetLeaderAddress(); leader != "" {
+ return leader, nil
+ }
+ if cr.IsLeader() {
+ return cr.gatewayAddress, nil
+ }
+
+ if time.Now().After(deadline) {
+ break
+ }
+ }
+
+ return "", fmt.Errorf("timeout waiting for leader election after %v", timeout)
+}
+
+// AssignCoordinator assigns a coordinator for a consumer group using a balanced strategy.
+// The coordinator is selected deterministically via consistent hashing of the
+// consumer group across the set of healthy gateways. This spreads groups evenly
+// and avoids hot-spotting on the first requester.
+func (cr *CoordinatorRegistry) AssignCoordinator(consumerGroup string, requestingGateway string) (*protocol.CoordinatorAssignment, error) {
+ if !cr.IsLeader() {
+ return nil, fmt.Errorf("not the coordinator registry leader")
+ }
+
+ // First check if requesting gateway is healthy without holding assignments lock
+ if !cr.isGatewayHealthy(requestingGateway) {
+ return nil, fmt.Errorf("requesting gateway %s is not healthy", requestingGateway)
+ }
+
+ // Lock assignments mutex to coordinate file operations
+ cr.assignmentsMutex.Lock()
+ defer cr.assignmentsMutex.Unlock()
+
+ // Check if coordinator already assigned by trying to load from file
+ existing, err := cr.loadCoordinatorAssignment(consumerGroup)
+ if err == nil && existing != nil {
+ // Assignment exists, check if coordinator is still healthy
+ if cr.isGatewayHealthy(existing.CoordinatorAddr) {
+ glog.V(2).Infof("Consumer group %s already has healthy coordinator %s", consumerGroup, existing.CoordinatorAddr)
+ return existing, nil
+ } else {
+ glog.V(1).Infof("Existing coordinator %s for group %s is unhealthy, reassigning", existing.CoordinatorAddr, consumerGroup)
+ // Delete the existing assignment file
+ if delErr := cr.deleteCoordinatorAssignment(consumerGroup); delErr != nil {
+ glog.Warningf("Failed to delete stale assignment for group %s: %v", consumerGroup, delErr)
+ }
+ }
+ }
+
+ // Choose a balanced coordinator via consistent hashing across healthy gateways
+ chosenAddr, nodeID, err := cr.chooseCoordinatorAddrForGroup(consumerGroup)
+ if err != nil {
+ return nil, err
+ }
+
+ assignment := &protocol.CoordinatorAssignment{
+ ConsumerGroup: consumerGroup,
+ CoordinatorAddr: chosenAddr,
+ CoordinatorNodeID: nodeID,
+ AssignedAt: time.Now(),
+ LastHeartbeat: time.Now(),
+ }
+
+ // Persist the new assignment to individual file
+ if err := cr.saveCoordinatorAssignment(consumerGroup, assignment); err != nil {
+ return nil, fmt.Errorf("failed to persist coordinator assignment for group %s: %w", consumerGroup, err)
+ }
+
+ glog.V(1).Infof("Assigned coordinator %s (node %d) for consumer group %s via consistent hashing", chosenAddr, nodeID, consumerGroup)
+ return assignment, nil
+}
+
+// GetCoordinator returns the coordinator for a consumer group
+func (cr *CoordinatorRegistry) GetCoordinator(consumerGroup string) (*protocol.CoordinatorAssignment, error) {
+ if !cr.IsLeader() {
+ return nil, fmt.Errorf("not the coordinator registry leader")
+ }
+
+ // Load assignment directly from file
+ assignment, err := cr.loadCoordinatorAssignment(consumerGroup)
+ if err != nil {
+ return nil, fmt.Errorf("no coordinator assigned for consumer group %s: %w", consumerGroup, err)
+ }
+
+ return assignment, nil
+}
+
+// RegisterGateway registers a gateway instance
+func (cr *CoordinatorRegistry) RegisterGateway(gatewayAddress string) error {
+ if !cr.IsLeader() {
+ return fmt.Errorf("not the coordinator registry leader")
+ }
+
+ cr.registerGateway(gatewayAddress)
+ return nil
+}
+
+// registerGateway internal method to register a gateway
+func (cr *CoordinatorRegistry) registerGateway(gatewayAddress string) {
+ cr.gatewaysMutex.Lock()
+ defer cr.gatewaysMutex.Unlock()
+
+ nodeID := generateDeterministicNodeID(gatewayAddress)
+
+ cr.activeGateways[gatewayAddress] = &GatewayInfo{
+ Address: gatewayAddress,
+ NodeID: nodeID,
+ RegisteredAt: time.Now(),
+ LastHeartbeat: time.Now(),
+ IsHealthy: true,
+ }
+
+ glog.V(1).Infof("Registered gateway %s with deterministic node ID %d", gatewayAddress, nodeID)
+}
+
+// HeartbeatGateway updates the heartbeat for a gateway
+func (cr *CoordinatorRegistry) HeartbeatGateway(gatewayAddress string) error {
+ if !cr.IsLeader() {
+ return fmt.Errorf("not the coordinator registry leader")
+ }
+
+ cr.gatewaysMutex.Lock()
+
+ if gateway, exists := cr.activeGateways[gatewayAddress]; exists {
+ gateway.LastHeartbeat = time.Now()
+ gateway.IsHealthy = true
+ cr.gatewaysMutex.Unlock()
+ glog.V(3).Infof("Updated heartbeat for gateway %s", gatewayAddress)
+ } else {
+ // Auto-register unknown gateway - unlock first to avoid double unlock
+ cr.gatewaysMutex.Unlock()
+ cr.registerGateway(gatewayAddress)
+ }
+
+ return nil
+}
+
+// isGatewayHealthy checks if a gateway is healthy
+func (cr *CoordinatorRegistry) isGatewayHealthy(gatewayAddress string) bool {
+ cr.gatewaysMutex.RLock()
+ defer cr.gatewaysMutex.RUnlock()
+
+ return cr.isGatewayHealthyUnsafe(gatewayAddress)
+}
+
+// isGatewayHealthyUnsafe checks if a gateway is healthy without acquiring locks
+// Caller must hold gatewaysMutex.RLock() or gatewaysMutex.Lock()
+func (cr *CoordinatorRegistry) isGatewayHealthyUnsafe(gatewayAddress string) bool {
+ gateway, exists := cr.activeGateways[gatewayAddress]
+ if !exists {
+ return false
+ }
+
+ return gateway.IsHealthy && time.Since(gateway.LastHeartbeat) < GatewayTimeout
+}
+
+// getGatewayNodeID returns the node ID for a gateway
+func (cr *CoordinatorRegistry) getGatewayNodeID(gatewayAddress string) int32 {
+ cr.gatewaysMutex.RLock()
+ defer cr.gatewaysMutex.RUnlock()
+
+ return cr.getGatewayNodeIDUnsafe(gatewayAddress)
+}
+
+// getGatewayNodeIDUnsafe returns the node ID for a gateway without acquiring locks
+// Caller must hold gatewaysMutex.RLock() or gatewaysMutex.Lock()
+func (cr *CoordinatorRegistry) getGatewayNodeIDUnsafe(gatewayAddress string) int32 {
+ if gateway, exists := cr.activeGateways[gatewayAddress]; exists {
+ return gateway.NodeID
+ }
+
+ return 1 // Default node ID
+}
+
+// getHealthyGatewaysSorted returns a stable-sorted list of healthy gateway addresses.
+func (cr *CoordinatorRegistry) getHealthyGatewaysSorted() []string {
+ cr.gatewaysMutex.RLock()
+ defer cr.gatewaysMutex.RUnlock()
+
+ addresses := make([]string, 0, len(cr.activeGateways))
+ for addr, info := range cr.activeGateways {
+ if info.IsHealthy && time.Since(info.LastHeartbeat) < GatewayTimeout {
+ addresses = append(addresses, addr)
+ }
+ }
+
+ sort.Strings(addresses)
+ return addresses
+}
+
+// chooseCoordinatorAddrForGroup selects a coordinator address using consistent hashing.
+func (cr *CoordinatorRegistry) chooseCoordinatorAddrForGroup(consumerGroup string) (string, int32, error) {
+ healthy := cr.getHealthyGatewaysSorted()
+ if len(healthy) == 0 {
+ return "", 0, fmt.Errorf("no healthy gateways available for coordinator assignment")
+ }
+ idx := hashStringToIndex(consumerGroup, len(healthy))
+ addr := healthy[idx]
+ return addr, cr.getGatewayNodeID(addr), nil
+}
+
+// hashStringToIndex hashes a string to an index in [0, modulo).
+func hashStringToIndex(s string, modulo int) int {
+ if modulo <= 0 {
+ return 0
+ }
+ h := fnv.New32a()
+ _, _ = h.Write([]byte(s))
+ return int(h.Sum32() % uint32(modulo))
+}
+
+// generateDeterministicNodeID generates a stable node ID based on gateway address
+func generateDeterministicNodeID(gatewayAddress string) int32 {
+ h := fnv.New32a()
+ _, _ = h.Write([]byte(gatewayAddress))
+ // Use only positive values and avoid 0
+ return int32(h.Sum32()&0x7fffffff) + 1
+}
+
+// startHeartbeatLoop starts the heartbeat loop for this gateway
+func (cr *CoordinatorRegistry) startHeartbeatLoop() {
+ cr.wg.Add(1)
+ go func() {
+ defer cr.wg.Done()
+
+ ticker := time.NewTicker(HeartbeatInterval / 2) // Send heartbeats more frequently than timeout
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-cr.stopChan:
+ return
+ case <-ticker.C:
+ if cr.IsLeader() {
+ // Send heartbeat for this gateway to keep it healthy
+ if err := cr.HeartbeatGateway(cr.gatewayAddress); err != nil {
+ glog.V(2).Infof("Failed to send heartbeat for gateway %s: %v", cr.gatewayAddress, err)
+ }
+ }
+ }
+ }
+ }()
+}
+
+// startCleanupLoop starts the cleanup loop for stale assignments and gateways
+func (cr *CoordinatorRegistry) startCleanupLoop() {
+ cr.wg.Add(1)
+ go func() {
+ defer cr.wg.Done()
+
+ ticker := time.NewTicker(HeartbeatInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-cr.stopChan:
+ return
+ case <-ticker.C:
+ if cr.IsLeader() {
+ cr.cleanupStaleEntries()
+ }
+ }
+ }
+ }()
+}
+
+// cleanupStaleEntries removes stale gateways and assignments
+func (cr *CoordinatorRegistry) cleanupStaleEntries() {
+ now := time.Now()
+
+ // First, identify stale gateways
+ var staleGateways []string
+ cr.gatewaysMutex.Lock()
+ for addr, gateway := range cr.activeGateways {
+ if now.Sub(gateway.LastHeartbeat) > GatewayTimeout {
+ staleGateways = append(staleGateways, addr)
+ }
+ }
+ // Remove stale gateways
+ for _, addr := range staleGateways {
+ glog.V(1).Infof("Removing stale gateway %s", addr)
+ delete(cr.activeGateways, addr)
+ }
+ cr.gatewaysMutex.Unlock()
+
+ // Then, identify assignments with unhealthy coordinators and reassign them
+ cr.assignmentsMutex.Lock()
+ defer cr.assignmentsMutex.Unlock()
+
+ // Get list of all consumer groups with assignments
+ consumerGroups, err := cr.listAllCoordinatorAssignments()
+ if err != nil {
+ glog.Warningf("Failed to list coordinator assignments during cleanup: %v", err)
+ return
+ }
+
+ for _, group := range consumerGroups {
+ // Load assignment from file
+ assignment, err := cr.loadCoordinatorAssignment(group)
+ if err != nil {
+ glog.Warningf("Failed to load assignment for group %s during cleanup: %v", group, err)
+ continue
+ }
+
+ // Check if coordinator is healthy
+ if !cr.isGatewayHealthy(assignment.CoordinatorAddr) {
+ glog.V(1).Infof("Coordinator %s for group %s is unhealthy, attempting reassignment", assignment.CoordinatorAddr, group)
+
+ // Try to reassign to a healthy gateway
+ newAddr, newNodeID, err := cr.chooseCoordinatorAddrForGroup(group)
+ if err != nil {
+ // No healthy gateways available, remove the assignment for now
+ glog.Warningf("No healthy gateways available for reassignment of group %s, removing assignment", group)
+ if delErr := cr.deleteCoordinatorAssignment(group); delErr != nil {
+ glog.Warningf("Failed to delete assignment for group %s: %v", group, delErr)
+ }
+ } else if newAddr != assignment.CoordinatorAddr {
+ // Reassign to the new healthy coordinator
+ newAssignment := &protocol.CoordinatorAssignment{
+ ConsumerGroup: group,
+ CoordinatorAddr: newAddr,
+ CoordinatorNodeID: newNodeID,
+ AssignedAt: time.Now(),
+ LastHeartbeat: time.Now(),
+ }
+
+ // Save new assignment to file
+ if saveErr := cr.saveCoordinatorAssignment(group, newAssignment); saveErr != nil {
+ glog.Warningf("Failed to save reassignment for group %s: %v", group, saveErr)
+ } else {
+ glog.V(0).Infof("Reassigned coordinator for group %s from unhealthy %s to healthy %s",
+ group, assignment.CoordinatorAddr, newAddr)
+ }
+ }
+ }
+ }
+}
+
+// GetStats returns registry statistics
+func (cr *CoordinatorRegistry) GetStats() map[string]interface{} {
+ // Read counts separately to avoid holding locks while calling IsLeader()
+ cr.gatewaysMutex.RLock()
+ gatewayCount := len(cr.activeGateways)
+ cr.gatewaysMutex.RUnlock()
+
+ // Count assignments from files
+ var assignmentCount int
+ if cr.IsLeader() {
+ consumerGroups, err := cr.listAllCoordinatorAssignments()
+ if err != nil {
+ glog.Warningf("Failed to count coordinator assignments: %v", err)
+ assignmentCount = -1 // Indicate error
+ } else {
+ assignmentCount = len(consumerGroups)
+ }
+ } else {
+ assignmentCount = 0 // Non-leader doesn't track assignments
+ }
+
+ return map[string]interface{}{
+ "is_leader": cr.IsLeader(),
+ "leader_address": cr.GetLeaderAddress(),
+ "active_gateways": gatewayCount,
+ "assignments": assignmentCount,
+ "gateway_address": cr.gatewayAddress,
+ }
+}
+
+// Persistence methods for coordinator assignments
+
+// saveCoordinatorAssignment saves a single coordinator assignment to its individual file
+func (cr *CoordinatorRegistry) saveCoordinatorAssignment(consumerGroup string, assignment *protocol.CoordinatorAssignment) error {
+ if !cr.IsLeader() {
+ // Only leader should save assignments
+ return nil
+ }
+
+ return cr.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Convert assignment to JSON
+ assignmentData, err := json.Marshal(assignment)
+ if err != nil {
+ return fmt.Errorf("failed to marshal assignment for group %s: %w", consumerGroup, err)
+ }
+
+ // Save to individual file: /topics/kafka/.meta/coordinators/<consumer-group>_assignments.json
+ fileName := fmt.Sprintf("%s_assignments.json", consumerGroup)
+ return filer.SaveInsideFiler(client, CoordinatorAssignmentsDir, fileName, assignmentData)
+ })
+}
+
+// loadCoordinatorAssignment loads a single coordinator assignment from its individual file
+func (cr *CoordinatorRegistry) loadCoordinatorAssignment(consumerGroup string) (*protocol.CoordinatorAssignment, error) {
+ return cr.loadCoordinatorAssignmentWithClient(consumerGroup, cr.filerClientAccessor)
+}
+
+// loadCoordinatorAssignmentWithClient loads a single coordinator assignment using provided client
+func (cr *CoordinatorRegistry) loadCoordinatorAssignmentWithClient(consumerGroup string, clientAccessor *filer_client.FilerClientAccessor) (*protocol.CoordinatorAssignment, error) {
+ var assignment *protocol.CoordinatorAssignment
+
+ err := clientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Load from individual file: /topics/kafka/.meta/coordinators/<consumer-group>_assignments.json
+ fileName := fmt.Sprintf("%s_assignments.json", consumerGroup)
+ data, err := filer.ReadInsideFiler(client, CoordinatorAssignmentsDir, fileName)
+ if err != nil {
+ return fmt.Errorf("assignment file not found for group %s: %w", consumerGroup, err)
+ }
+
+ // Parse JSON
+ if err := json.Unmarshal(data, &assignment); err != nil {
+ return fmt.Errorf("failed to unmarshal assignment for group %s: %w", consumerGroup, err)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return assignment, nil
+}
+
+// listAllCoordinatorAssignments lists all coordinator assignment files
+func (cr *CoordinatorRegistry) listAllCoordinatorAssignments() ([]string, error) {
+ var consumerGroups []string
+
+ err := cr.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.ListEntriesRequest{
+ Directory: CoordinatorAssignmentsDir,
+ }
+
+ stream, streamErr := client.ListEntries(context.Background(), request)
+ if streamErr != nil {
+ // Directory might not exist yet, that's okay
+ return nil
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ }
+ return fmt.Errorf("failed to receive entry: %v", recvErr)
+ }
+
+ // Only include assignment files (ending with _assignments.json)
+ if resp.Entry != nil && !resp.Entry.IsDirectory &&
+ strings.HasSuffix(resp.Entry.Name, "_assignments.json") {
+ // Extract consumer group name by removing _assignments.json suffix
+ consumerGroup := strings.TrimSuffix(resp.Entry.Name, "_assignments.json")
+ consumerGroups = append(consumerGroups, consumerGroup)
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to list coordinator assignments: %w", err)
+ }
+
+ return consumerGroups, nil
+}
+
+// deleteCoordinatorAssignment removes a coordinator assignment file
+func (cr *CoordinatorRegistry) deleteCoordinatorAssignment(consumerGroup string) error {
+ if !cr.IsLeader() {
+ return nil
+ }
+
+ return cr.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ fileName := fmt.Sprintf("%s_assignments.json", consumerGroup)
+ filePath := fmt.Sprintf("%s/%s", CoordinatorAssignmentsDir, fileName)
+
+ _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
+ Directory: CoordinatorAssignmentsDir,
+ Name: fileName,
+ })
+
+ if err != nil {
+ return fmt.Errorf("failed to delete assignment file %s: %w", filePath, err)
+ }
+
+ return nil
+ })
+}
+
+// ReassignCoordinator manually reassigns a coordinator for a consumer group
+// This can be called when a coordinator gateway becomes unavailable
+func (cr *CoordinatorRegistry) ReassignCoordinator(consumerGroup string) (*protocol.CoordinatorAssignment, error) {
+ if !cr.IsLeader() {
+ return nil, fmt.Errorf("not the coordinator registry leader")
+ }
+
+ cr.assignmentsMutex.Lock()
+ defer cr.assignmentsMutex.Unlock()
+
+ // Check if assignment exists by loading from file
+ existing, err := cr.loadCoordinatorAssignment(consumerGroup)
+ if err != nil {
+ return nil, fmt.Errorf("no existing assignment for consumer group %s: %w", consumerGroup, err)
+ }
+
+ // Choose a new coordinator
+ newAddr, newNodeID, err := cr.chooseCoordinatorAddrForGroup(consumerGroup)
+ if err != nil {
+ return nil, fmt.Errorf("failed to choose new coordinator: %w", err)
+ }
+
+ // Create new assignment
+ newAssignment := &protocol.CoordinatorAssignment{
+ ConsumerGroup: consumerGroup,
+ CoordinatorAddr: newAddr,
+ CoordinatorNodeID: newNodeID,
+ AssignedAt: time.Now(),
+ LastHeartbeat: time.Now(),
+ }
+
+ // Persist the new assignment to individual file
+ if err := cr.saveCoordinatorAssignment(consumerGroup, newAssignment); err != nil {
+ return nil, fmt.Errorf("failed to persist coordinator reassignment for group %s: %w", consumerGroup, err)
+ }
+
+ glog.V(0).Infof("Manually reassigned coordinator for group %s from %s to %s",
+ consumerGroup, existing.CoordinatorAddr, newAddr)
+
+ return newAssignment, nil
+}
diff --git a/weed/mq/kafka/gateway/coordinator_registry_test.go b/weed/mq/kafka/gateway/coordinator_registry_test.go
new file mode 100644
index 000000000..9ce560cd1
--- /dev/null
+++ b/weed/mq/kafka/gateway/coordinator_registry_test.go
@@ -0,0 +1,309 @@
+package gateway
+
+import (
+ "testing"
+ "time"
+)
+
+func TestCoordinatorRegistry_DeterministicNodeID(t *testing.T) {
+ // Test that node IDs are deterministic and stable
+ addr1 := "gateway1:9092"
+ addr2 := "gateway2:9092"
+
+ id1a := generateDeterministicNodeID(addr1)
+ id1b := generateDeterministicNodeID(addr1)
+ id2 := generateDeterministicNodeID(addr2)
+
+ if id1a != id1b {
+ t.Errorf("Node ID should be deterministic: %d != %d", id1a, id1b)
+ }
+
+ if id1a == id2 {
+ t.Errorf("Different addresses should have different node IDs: %d == %d", id1a, id2)
+ }
+
+ if id1a <= 0 || id2 <= 0 {
+ t.Errorf("Node IDs should be positive: %d, %d", id1a, id2)
+ }
+}
+
+func TestCoordinatorRegistry_BasicOperations(t *testing.T) {
+ // Create a test registry without actual filer connection
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true, // Simulate being leader for tests
+ }
+
+ // Test gateway registration
+ gatewayAddr := "test-gateway:9092"
+ registry.registerGateway(gatewayAddr)
+
+ if len(registry.activeGateways) != 1 {
+ t.Errorf("Expected 1 gateway, got %d", len(registry.activeGateways))
+ }
+
+ gateway, exists := registry.activeGateways[gatewayAddr]
+ if !exists {
+ t.Error("Gateway should be registered")
+ }
+
+ if gateway.NodeID <= 0 {
+ t.Errorf("Gateway should have positive node ID, got %d", gateway.NodeID)
+ }
+
+ // Test gateway health check
+ if !registry.isGatewayHealthyUnsafe(gatewayAddr) {
+ t.Error("Newly registered gateway should be healthy")
+ }
+
+ // Test node ID retrieval
+ nodeID := registry.getGatewayNodeIDUnsafe(gatewayAddr)
+ if nodeID != gateway.NodeID {
+ t.Errorf("Expected node ID %d, got %d", gateway.NodeID, nodeID)
+ }
+}
+
+func TestCoordinatorRegistry_AssignCoordinator(t *testing.T) {
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true,
+ }
+
+ // Register a gateway
+ gatewayAddr := "test-gateway:9092"
+ registry.registerGateway(gatewayAddr)
+
+ // Test coordinator assignment when not leader
+ registry.isLeader = false
+ _, err := registry.AssignCoordinator("test-group", gatewayAddr)
+ if err == nil {
+ t.Error("Should fail when not leader")
+ }
+
+ // Test coordinator assignment when leader
+ // Note: This will panic due to no filer client, but we expect this in unit tests
+ registry.isLeader = true
+ func() {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Error("Expected panic due to missing filer client")
+ }
+ }()
+ registry.AssignCoordinator("test-group", gatewayAddr)
+ }()
+
+ // Test getting assignment when not leader
+ registry.isLeader = false
+ _, err = registry.GetCoordinator("test-group")
+ if err == nil {
+ t.Error("Should fail when not leader")
+ }
+}
+
+func TestCoordinatorRegistry_HealthyGateways(t *testing.T) {
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true,
+ }
+
+ // Register multiple gateways
+ gateways := []string{"gateway1:9092", "gateway2:9092", "gateway3:9092"}
+ for _, addr := range gateways {
+ registry.registerGateway(addr)
+ }
+
+ // All should be healthy initially
+ healthy := registry.getHealthyGatewaysSorted()
+ if len(healthy) != len(gateways) {
+ t.Errorf("Expected %d healthy gateways, got %d", len(gateways), len(healthy))
+ }
+
+ // Make one gateway stale
+ registry.activeGateways["gateway2:9092"].LastHeartbeat = time.Now().Add(-2 * GatewayTimeout)
+
+ healthy = registry.getHealthyGatewaysSorted()
+ if len(healthy) != len(gateways)-1 {
+ t.Errorf("Expected %d healthy gateways after one became stale, got %d", len(gateways)-1, len(healthy))
+ }
+
+ // Check that results are sorted
+ for i := 1; i < len(healthy); i++ {
+ if healthy[i-1] >= healthy[i] {
+ t.Errorf("Healthy gateways should be sorted: %v", healthy)
+ break
+ }
+ }
+}
+
+func TestCoordinatorRegistry_ConsistentHashing(t *testing.T) {
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true,
+ }
+
+ // Register multiple gateways
+ gateways := []string{"gateway1:9092", "gateway2:9092", "gateway3:9092"}
+ for _, addr := range gateways {
+ registry.registerGateway(addr)
+ }
+
+ // Test that same group always gets same coordinator
+ group := "test-group"
+ addr1, nodeID1, err1 := registry.chooseCoordinatorAddrForGroup(group)
+ addr2, nodeID2, err2 := registry.chooseCoordinatorAddrForGroup(group)
+
+ if err1 != nil || err2 != nil {
+ t.Errorf("Failed to choose coordinator: %v, %v", err1, err2)
+ }
+
+ if addr1 != addr2 || nodeID1 != nodeID2 {
+ t.Errorf("Consistent hashing should return same result: (%s,%d) != (%s,%d)",
+ addr1, nodeID1, addr2, nodeID2)
+ }
+
+ // Test that different groups can get different coordinators
+ groups := []string{"group1", "group2", "group3", "group4", "group5"}
+ coordinators := make(map[string]bool)
+
+ for _, g := range groups {
+ addr, _, err := registry.chooseCoordinatorAddrForGroup(g)
+ if err != nil {
+ t.Errorf("Failed to choose coordinator for %s: %v", g, err)
+ }
+ coordinators[addr] = true
+ }
+
+ // With multiple groups and gateways, we should see some distribution
+ // (though not guaranteed due to hashing)
+ if len(coordinators) == 1 && len(gateways) > 1 {
+ t.Log("Warning: All groups mapped to same coordinator (possible but unlikely)")
+ }
+}
+
+func TestCoordinatorRegistry_CleanupStaleEntries(t *testing.T) {
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true,
+ }
+
+ // Register gateways and create assignments
+ gateway1 := "gateway1:9092"
+ gateway2 := "gateway2:9092"
+
+ registry.registerGateway(gateway1)
+ registry.registerGateway(gateway2)
+
+ // Note: In the actual implementation, assignments are stored in filer.
+ // For this test, we'll skip assignment creation since we don't have a mock filer.
+
+ // Make gateway2 stale
+ registry.activeGateways[gateway2].LastHeartbeat = time.Now().Add(-2 * GatewayTimeout)
+
+ // Verify gateways are present before cleanup
+ if _, exists := registry.activeGateways[gateway1]; !exists {
+ t.Error("Gateway1 should be present before cleanup")
+ }
+ if _, exists := registry.activeGateways[gateway2]; !exists {
+ t.Error("Gateway2 should be present before cleanup")
+ }
+
+ // Run cleanup - this will panic due to missing filer client, but that's expected
+ func() {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Error("Expected panic due to missing filer client during cleanup")
+ }
+ }()
+ registry.cleanupStaleEntries()
+ }()
+
+ // Note: Gateway cleanup assertions are skipped since cleanup panics due to missing filer client.
+ // In real usage, cleanup would remove stale gateways and handle filer-based assignment cleanup.
+}
+
+func TestCoordinatorRegistry_GetStats(t *testing.T) {
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true,
+ }
+
+ // Add some data
+ registry.registerGateway("gateway1:9092")
+ registry.registerGateway("gateway2:9092")
+
+ // Note: Assignment creation is skipped since assignments are now stored in filer
+
+ // GetStats will panic when trying to count assignments from filer
+ func() {
+ defer func() {
+ if r := recover(); r == nil {
+ t.Error("Expected panic due to missing filer client in GetStats")
+ }
+ }()
+ registry.GetStats()
+ }()
+
+ // Note: Stats verification is skipped since GetStats panics due to missing filer client.
+ // In real usage, GetStats would return proper counts of gateways and assignments.
+}
+
+func TestCoordinatorRegistry_HeartbeatGateway(t *testing.T) {
+ registry := &CoordinatorRegistry{
+ activeGateways: make(map[string]*GatewayInfo),
+ gatewayAddress: "test-gateway:9092",
+ stopChan: make(chan struct{}),
+ leadershipChange: make(chan string, 10),
+ isLeader: true,
+ }
+
+ gatewayAddr := "test-gateway:9092"
+
+ // Test heartbeat for non-existent gateway (should auto-register)
+ err := registry.HeartbeatGateway(gatewayAddr)
+ if err != nil {
+ t.Errorf("Heartbeat should succeed and auto-register: %v", err)
+ }
+
+ if len(registry.activeGateways) != 1 {
+ t.Errorf("Gateway should be auto-registered")
+ }
+
+ // Test heartbeat for existing gateway
+ originalTime := registry.activeGateways[gatewayAddr].LastHeartbeat
+ time.Sleep(10 * time.Millisecond) // Ensure time difference
+
+ err = registry.HeartbeatGateway(gatewayAddr)
+ if err != nil {
+ t.Errorf("Heartbeat should succeed: %v", err)
+ }
+
+ newTime := registry.activeGateways[gatewayAddr].LastHeartbeat
+ if !newTime.After(originalTime) {
+ t.Error("Heartbeat should update LastHeartbeat time")
+ }
+
+ // Test heartbeat when not leader
+ registry.isLeader = false
+ err = registry.HeartbeatGateway(gatewayAddr)
+ if err == nil {
+ t.Error("Heartbeat should fail when not leader")
+ }
+}
diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go
new file mode 100644
index 000000000..9f4e0c81f
--- /dev/null
+++ b/weed/mq/kafka/gateway/server.go
@@ -0,0 +1,300 @@
+package gateway
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+// resolveAdvertisedAddress resolves the appropriate address to advertise to Kafka clients
+// when the server binds to all interfaces (:: or 0.0.0.0)
+func resolveAdvertisedAddress() string {
+ // Try to find a non-loopback interface
+ interfaces, err := net.Interfaces()
+ if err != nil {
+ glog.V(1).Infof("Failed to get network interfaces, using localhost: %v", err)
+ return "127.0.0.1"
+ }
+
+ for _, iface := range interfaces {
+ // Skip loopback and inactive interfaces
+ if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 {
+ continue
+ }
+
+ addrs, err := iface.Addrs()
+ if err != nil {
+ continue
+ }
+
+ for _, addr := range addrs {
+ if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
+ // Prefer IPv4 addresses for better Kafka client compatibility
+ if ipv4 := ipNet.IP.To4(); ipv4 != nil {
+ return ipv4.String()
+ }
+ }
+ }
+ }
+
+ // Fallback to localhost if no suitable interface found
+ glog.V(1).Infof("No non-loopback interface found, using localhost")
+ return "127.0.0.1"
+}
+
+type Options struct {
+ Listen string
+ Masters string // SeaweedFS master servers
+ FilerGroup string // filer group name (optional)
+ SchemaRegistryURL string // Schema Registry URL (optional)
+ DefaultPartitions int32 // Default number of partitions for new topics
+}
+
+type Server struct {
+ opts Options
+ ln net.Listener
+ wg sync.WaitGroup
+ ctx context.Context
+ cancel context.CancelFunc
+ handler *protocol.Handler
+ coordinatorRegistry *CoordinatorRegistry
+}
+
+func NewServer(opts Options) *Server {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ var handler *protocol.Handler
+ var err error
+
+ // Create SeaweedMQ handler - masters are required for production
+ if opts.Masters == "" {
+ glog.Fatalf("SeaweedMQ masters are required for Kafka gateway - provide masters addresses")
+ }
+
+ // Use the intended listen address as the client host for master registration
+ clientHost := opts.Listen
+ if clientHost == "" {
+ clientHost = "127.0.0.1:9092" // Default Kafka port
+ }
+
+ handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup, clientHost)
+ if err != nil {
+ glog.Fatalf("Failed to create SeaweedMQ handler with masters %s: %v", opts.Masters, err)
+ }
+
+ glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters)
+
+ // Initialize schema management if Schema Registry URL is provided
+ // Note: This is done lazily on first use if it fails here (e.g., if Schema Registry isn't ready yet)
+ if opts.SchemaRegistryURL != "" {
+ schemaConfig := schema.ManagerConfig{
+ RegistryURL: opts.SchemaRegistryURL,
+ }
+ if err := handler.EnableSchemaManagement(schemaConfig); err != nil {
+ glog.Warningf("Schema management initialization deferred (Schema Registry may not be ready yet): %v", err)
+ glog.V(1).Infof("Will retry schema management initialization on first schema-related operation")
+ // Store schema registry URL for lazy initialization
+ handler.SetSchemaRegistryURL(opts.SchemaRegistryURL)
+ } else {
+ glog.V(1).Infof("Schema management enabled with Schema Registry at %s", opts.SchemaRegistryURL)
+ }
+ }
+
+ server := &Server{
+ opts: opts,
+ ctx: ctx,
+ cancel: cancel,
+ handler: handler,
+ }
+
+ return server
+}
+
+// NewTestServerForUnitTests creates a test server with a minimal mock handler for unit tests
+// This allows basic gateway functionality testing without requiring SeaweedMQ masters
+func NewTestServerForUnitTests(opts Options) *Server {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Create a minimal handler with mock SeaweedMQ backend
+ handler := NewMinimalTestHandler()
+
+ return &Server{
+ opts: opts,
+ ctx: ctx,
+ cancel: cancel,
+ handler: handler,
+ }
+}
+
+func (s *Server) Start() error {
+ ln, err := net.Listen("tcp", s.opts.Listen)
+ if err != nil {
+ return err
+ }
+ s.ln = ln
+
+ // Get gateway address for coordinator registry
+ // CRITICAL FIX: Use the actual bound address from listener, not the requested listen address
+ // This is important when using port 0 (random port) for testing
+ actualListenAddr := s.ln.Addr().String()
+ host, port := s.handler.GetAdvertisedAddress(actualListenAddr)
+ gatewayAddress := fmt.Sprintf("%s:%d", host, port)
+ glog.V(1).Infof("Kafka gateway listening on %s, advertising as %s in Metadata responses", actualListenAddr, gatewayAddress)
+
+ // Set gateway address in handler for coordinator registry
+ s.handler.SetGatewayAddress(gatewayAddress)
+
+ // Initialize coordinator registry for distributed coordinator assignment (only if masters are configured)
+ if s.opts.Masters != "" {
+ // Parse all masters from the comma-separated list using pb.ServerAddresses
+ masters := pb.ServerAddresses(s.opts.Masters).ToAddresses()
+
+ grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
+
+ s.coordinatorRegistry = NewCoordinatorRegistry(gatewayAddress, masters, grpcDialOption)
+ s.handler.SetCoordinatorRegistry(s.coordinatorRegistry)
+
+ // Start coordinator registry
+ if err := s.coordinatorRegistry.Start(); err != nil {
+ glog.Errorf("Failed to start coordinator registry: %v", err)
+ return err
+ }
+
+ glog.V(1).Infof("Started coordinator registry for gateway %s", gatewayAddress)
+ } else {
+ glog.V(1).Infof("No masters configured, skipping coordinator registry setup (test mode)")
+ }
+ s.wg.Add(1)
+ go func() {
+ defer s.wg.Done()
+ for {
+ conn, err := s.ln.Accept()
+ if err != nil {
+ select {
+ case <-s.ctx.Done():
+ return
+ default:
+ return
+ }
+ }
+ // Simple accept log to trace client connections (useful for JoinGroup debugging)
+ if conn != nil {
+ glog.V(1).Infof("accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr())
+ }
+ s.wg.Add(1)
+ go func(c net.Conn) {
+ defer s.wg.Done()
+ if err := s.handler.HandleConn(s.ctx, c); err != nil {
+ glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err)
+ }
+ }(conn)
+ }
+ }()
+ return nil
+}
+
+func (s *Server) Wait() error {
+ s.wg.Wait()
+ return nil
+}
+
+func (s *Server) Close() error {
+ s.cancel()
+
+ // Stop coordinator registry
+ if s.coordinatorRegistry != nil {
+ if err := s.coordinatorRegistry.Stop(); err != nil {
+ glog.Warningf("Error stopping coordinator registry: %v", err)
+ }
+ }
+
+ if s.ln != nil {
+ _ = s.ln.Close()
+ }
+
+ // Wait for goroutines to finish with a timeout to prevent hanging
+ done := make(chan struct{})
+ go func() {
+ s.wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Normal shutdown
+ case <-time.After(5 * time.Second):
+ // Timeout - force shutdown
+ glog.Warningf("Server shutdown timed out after 5 seconds, forcing close")
+ }
+
+ // Close the handler (important for SeaweedMQ mode)
+ if s.handler != nil {
+ if err := s.handler.Close(); err != nil {
+ glog.Warningf("Error closing handler: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// Removed registerWithBrokerLeader - no longer needed
+
+// Addr returns the bound address of the server listener, or empty if not started.
+func (s *Server) Addr() string {
+ if s.ln == nil {
+ return ""
+ }
+ // Normalize to an address reachable by clients
+ host, port := s.GetListenerAddr()
+ return net.JoinHostPort(host, strconv.Itoa(port))
+}
+
+// GetHandler returns the protocol handler (for testing)
+func (s *Server) GetHandler() *protocol.Handler {
+ return s.handler
+}
+
+// GetListenerAddr returns the actual listening address and port
+func (s *Server) GetListenerAddr() (string, int) {
+ if s.ln == nil {
+ // Return empty values to indicate address not available yet
+ // The caller should handle this appropriately
+ return "", 0
+ }
+
+ addr := s.ln.Addr().String()
+ // Parse [::]:port or host:port format - use exact match for kafka-go compatibility
+ if strings.HasPrefix(addr, "[::]:") {
+ port := strings.TrimPrefix(addr, "[::]:")
+ if p, err := strconv.Atoi(port); err == nil {
+ // Resolve appropriate address when bound to IPv6 all interfaces
+ return resolveAdvertisedAddress(), p
+ }
+ }
+
+ // Handle host:port format
+ if host, port, err := net.SplitHostPort(addr); err == nil {
+ if p, err := strconv.Atoi(port); err == nil {
+ // Resolve appropriate address when bound to all interfaces
+ if host == "::" || host == "" || host == "0.0.0.0" {
+ host = resolveAdvertisedAddress()
+ }
+ return host, p
+ }
+ }
+
+ // This should not happen if the listener was set up correctly
+ glog.Warningf("Unable to parse listener address: %s", addr)
+ return "", 0
+}
diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go
new file mode 100644
index 000000000..4bb0e28b1
--- /dev/null
+++ b/weed/mq/kafka/gateway/test_mock_handler.go
@@ -0,0 +1,224 @@
+package gateway
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol"
+ filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ schema_pb "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// mockRecord implements the SMQRecord interface for testing
+type mockRecord struct {
+ key []byte
+ value []byte
+ timestamp int64
+ offset int64
+}
+
+func (r *mockRecord) GetKey() []byte { return r.key }
+func (r *mockRecord) GetValue() []byte { return r.value }
+func (r *mockRecord) GetTimestamp() int64 { return r.timestamp }
+func (r *mockRecord) GetOffset() int64 { return r.offset }
+
+// mockSeaweedMQHandler is a stateful mock for unit testing without real SeaweedMQ
+type mockSeaweedMQHandler struct {
+ mu sync.RWMutex
+ topics map[string]*integration.KafkaTopicInfo
+ records map[string]map[int32][]integration.SMQRecord // topic -> partition -> records
+ offsets map[string]map[int32]int64 // topic -> partition -> next offset
+}
+
+func newMockSeaweedMQHandler() *mockSeaweedMQHandler {
+ return &mockSeaweedMQHandler{
+ topics: make(map[string]*integration.KafkaTopicInfo),
+ records: make(map[string]map[int32][]integration.SMQRecord),
+ offsets: make(map[string]map[int32]int64),
+ }
+}
+
+func (m *mockSeaweedMQHandler) TopicExists(topic string) bool {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ _, exists := m.topics[topic]
+ return exists
+}
+
+func (m *mockSeaweedMQHandler) ListTopics() []string {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ topics := make([]string, 0, len(m.topics))
+ for topic := range m.topics {
+ topics = append(topics, topic)
+ }
+ return topics
+}
+
+func (m *mockSeaweedMQHandler) CreateTopic(topic string, partitions int32) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, exists := m.topics[topic]; exists {
+ return fmt.Errorf("topic already exists")
+ }
+ m.topics[topic] = &integration.KafkaTopicInfo{
+ Name: topic,
+ Partitions: partitions,
+ }
+ return nil
+}
+
+func (m *mockSeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if _, exists := m.topics[name]; exists {
+ return fmt.Errorf("topic already exists")
+ }
+ m.topics[name] = &integration.KafkaTopicInfo{
+ Name: name,
+ Partitions: partitions,
+ }
+ return nil
+}
+
+func (m *mockSeaweedMQHandler) DeleteTopic(topic string) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ delete(m.topics, topic)
+ return nil
+}
+
+func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+ info, exists := m.topics[topic]
+ return info, exists
+}
+
+func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ // Check if topic exists
+ if _, exists := m.topics[topicName]; !exists {
+ return 0, fmt.Errorf("topic does not exist: %s", topicName)
+ }
+
+ // Initialize partition records if needed
+ if _, exists := m.records[topicName]; !exists {
+ m.records[topicName] = make(map[int32][]integration.SMQRecord)
+ m.offsets[topicName] = make(map[int32]int64)
+ }
+
+ // Get next offset
+ offset := m.offsets[topicName][partitionID]
+ m.offsets[topicName][partitionID]++
+
+ // Store record
+ record := &mockRecord{
+ key: key,
+ value: value,
+ offset: offset,
+ }
+ m.records[topicName][partitionID] = append(m.records[topicName][partitionID], record)
+
+ return offset, nil
+}
+
+func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+ return m.ProduceRecord(topicName, partitionID, key, recordValueBytes)
+}
+
+func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ // Check if topic exists
+ if _, exists := m.topics[topic]; !exists {
+ return nil, fmt.Errorf("topic does not exist: %s", topic)
+ }
+
+ // Get partition records
+ partitionRecords, exists := m.records[topic][partition]
+ if !exists || len(partitionRecords) == 0 {
+ return []integration.SMQRecord{}, nil
+ }
+
+ // Find records starting from fromOffset
+ result := make([]integration.SMQRecord, 0, maxRecords)
+ for _, record := range partitionRecords {
+ if record.GetOffset() >= fromOffset {
+ result = append(result, record)
+ if len(result) >= maxRecords {
+ break
+ }
+ }
+ }
+
+ return result, nil
+}
+
+func (m *mockSeaweedMQHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ // Check if topic exists
+ if _, exists := m.topics[topic]; !exists {
+ return 0, fmt.Errorf("topic does not exist: %s", topic)
+ }
+
+ // Get partition records
+ partitionRecords, exists := m.records[topic][partition]
+ if !exists || len(partitionRecords) == 0 {
+ return 0, nil
+ }
+
+ return partitionRecords[0].GetOffset(), nil
+}
+
+func (m *mockSeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ // Check if topic exists
+ if _, exists := m.topics[topic]; !exists {
+ return 0, fmt.Errorf("topic does not exist: %s", topic)
+ }
+
+ // Return next offset (latest + 1)
+ if offsets, exists := m.offsets[topic]; exists {
+ return offsets[partition], nil
+ }
+
+ return 0, nil
+}
+
+func (m *mockSeaweedMQHandler) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ return fmt.Errorf("mock handler: not implemented")
+}
+
+func (m *mockSeaweedMQHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
+ // Return a minimal broker client that won't actually connect
+ return nil, fmt.Errorf("mock handler: per-connection broker client not available in unit test mode")
+}
+
+func (m *mockSeaweedMQHandler) GetFilerClientAccessor() *filer_client.FilerClientAccessor {
+ return nil
+}
+
+func (m *mockSeaweedMQHandler) GetBrokerAddresses() []string {
+ return []string{"localhost:9092"} // Return a dummy broker address for unit tests
+}
+
+func (m *mockSeaweedMQHandler) Close() error { return nil }
+
+func (m *mockSeaweedMQHandler) SetProtocolHandler(h integration.ProtocolHandler) {}
+
+// NewMinimalTestHandler creates a minimal handler for unit testing
+// that won't actually process Kafka protocol requests
+func NewMinimalTestHandler() *protocol.Handler {
+ return protocol.NewTestHandlerWithMock(newMockSeaweedMQHandler())
+}