diff options
Diffstat (limited to 'weed/mq/kafka/gateway')
| -rw-r--r-- | weed/mq/kafka/gateway/coordinator_registry.go | 805 | ||||
| -rw-r--r-- | weed/mq/kafka/gateway/coordinator_registry_test.go | 309 | ||||
| -rw-r--r-- | weed/mq/kafka/gateway/server.go | 300 | ||||
| -rw-r--r-- | weed/mq/kafka/gateway/test_mock_handler.go | 224 |
4 files changed, 1638 insertions, 0 deletions
diff --git a/weed/mq/kafka/gateway/coordinator_registry.go b/weed/mq/kafka/gateway/coordinator_registry.go new file mode 100644 index 000000000..af3330b03 --- /dev/null +++ b/weed/mq/kafka/gateway/coordinator_registry.go @@ -0,0 +1,805 @@ +package gateway + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "io" + "sort" + "strings" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" +) + +// CoordinatorRegistry manages consumer group coordinator assignments +// Only the gateway leader maintains this registry +type CoordinatorRegistry struct { + // Leader election + leaderLock *cluster.LiveLock + isLeader bool + leaderMutex sync.RWMutex + leadershipChange chan string // Notifies when leadership changes + + // No in-memory assignments - read/write directly to filer + // assignmentsMutex still needed for coordinating file operations + assignmentsMutex sync.RWMutex + + // Gateway registry + activeGateways map[string]*GatewayInfo // gatewayAddress -> info + gatewaysMutex sync.RWMutex + + // Configuration + gatewayAddress string + lockClient *cluster.LockClient + filerClientAccessor *filer_client.FilerClientAccessor + filerDiscoveryService *filer_client.FilerDiscoveryService + + // Control + stopChan chan struct{} + wg sync.WaitGroup +} + +// Remove local CoordinatorAssignment - use protocol.CoordinatorAssignment instead + +// GatewayInfo represents an active gateway instance +type GatewayInfo struct { + Address string + NodeID int32 + RegisteredAt time.Time + LastHeartbeat time.Time + IsHealthy bool +} + +const ( + GatewayLeaderLockKey = "kafka-gateway-leader" + HeartbeatInterval = 10 * time.Second + GatewayTimeout = 30 * time.Second + + // Filer paths for coordinator assignment persistence + CoordinatorAssignmentsDir = "/topics/kafka/.meta/coordinators" +) + +// NewCoordinatorRegistry creates a new coordinator registry +func NewCoordinatorRegistry(gatewayAddress string, masters []pb.ServerAddress, grpcDialOption grpc.DialOption) *CoordinatorRegistry { + // Create filer discovery service that will periodically refresh filers from all masters + filerDiscoveryService := filer_client.NewFilerDiscoveryService(masters, grpcDialOption) + + // Manually discover filers from each master until we find one + var seedFiler pb.ServerAddress + for _, master := range masters { + // Use the same discovery logic as filer_discovery.go + grpcAddr := master.ToGrpcAddress() + conn, err := grpc.Dial(grpcAddr, grpcDialOption) + if err != nil { + continue + } + + client := master_pb.NewSeaweedClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + cancel() + conn.Close() + + if err == nil && len(resp.ClusterNodes) > 0 { + // Found a filer - use its HTTP address (WithFilerClient will convert to gRPC automatically) + seedFiler = pb.ServerAddress(resp.ClusterNodes[0].Address) + glog.V(1).Infof("Using filer %s as seed for distributed locking (discovered from master %s)", seedFiler, master) + break + } + } + + lockClient := cluster.NewLockClient(grpcDialOption, seedFiler) + + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: gatewayAddress, + lockClient: lockClient, + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), // Buffered channel for leadership notifications + filerDiscoveryService: filerDiscoveryService, + } + + // Create filer client accessor that uses dynamic filer discovery + registry.filerClientAccessor = &filer_client.FilerClientAccessor{ + GetGrpcDialOption: func() grpc.DialOption { + return grpcDialOption + }, + GetFilers: func() []pb.ServerAddress { + return registry.filerDiscoveryService.GetFilers() + }, + } + + return registry +} + +// Start begins the coordinator registry operations +func (cr *CoordinatorRegistry) Start() error { + glog.V(1).Infof("Starting coordinator registry for gateway %s", cr.gatewayAddress) + + // Start filer discovery service first + if err := cr.filerDiscoveryService.Start(); err != nil { + return fmt.Errorf("failed to start filer discovery service: %w", err) + } + + // Start leader election + cr.startLeaderElection() + + // Start heartbeat loop to keep this gateway healthy + cr.startHeartbeatLoop() + + // Start cleanup goroutine + cr.startCleanupLoop() + + // Register this gateway + cr.registerGateway(cr.gatewayAddress) + + return nil +} + +// Stop shuts down the coordinator registry +func (cr *CoordinatorRegistry) Stop() error { + glog.V(1).Infof("Stopping coordinator registry for gateway %s", cr.gatewayAddress) + + close(cr.stopChan) + cr.wg.Wait() + + // Release leader lock if held + if cr.leaderLock != nil { + cr.leaderLock.Stop() + } + + // Stop filer discovery service + if err := cr.filerDiscoveryService.Stop(); err != nil { + glog.Warningf("Failed to stop filer discovery service: %v", err) + } + + return nil +} + +// startLeaderElection starts the leader election process +func (cr *CoordinatorRegistry) startLeaderElection() { + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + + // Start long-lived lock for leader election + cr.leaderLock = cr.lockClient.StartLongLivedLock( + GatewayLeaderLockKey, + cr.gatewayAddress, + cr.onLeadershipChange, + ) + + // Wait for shutdown + <-cr.stopChan + + // The leader lock will be stopped when Stop() is called + }() +} + +// onLeadershipChange handles leadership changes +func (cr *CoordinatorRegistry) onLeadershipChange(newLeader string) { + cr.leaderMutex.Lock() + defer cr.leaderMutex.Unlock() + + wasLeader := cr.isLeader + cr.isLeader = (newLeader == cr.gatewayAddress) + + if cr.isLeader && !wasLeader { + glog.V(0).Infof("Gateway %s became the coordinator registry leader", cr.gatewayAddress) + cr.onBecameLeader() + } else if !cr.isLeader && wasLeader { + glog.V(0).Infof("Gateway %s lost coordinator registry leadership to %s", cr.gatewayAddress, newLeader) + cr.onLostLeadership() + } + + // Notify waiting goroutines about leadership change + select { + case cr.leadershipChange <- newLeader: + // Notification sent + default: + // Channel full, skip notification (shouldn't happen with buffered channel) + } +} + +// onBecameLeader handles becoming the leader +func (cr *CoordinatorRegistry) onBecameLeader() { + // Assignments are now read directly from files - no need to load into memory + glog.V(1).Info("Leader election complete - coordinator assignments will be read from filer as needed") + + // Clear gateway registry since it's ephemeral (gateways need to re-register) + cr.gatewaysMutex.Lock() + cr.activeGateways = make(map[string]*GatewayInfo) + cr.gatewaysMutex.Unlock() + + // Re-register this gateway + cr.registerGateway(cr.gatewayAddress) +} + +// onLostLeadership handles losing leadership +func (cr *CoordinatorRegistry) onLostLeadership() { + // No in-memory assignments to clear - assignments are stored in filer + glog.V(1).Info("Lost leadership - no longer managing coordinator assignments") +} + +// IsLeader returns whether this gateway is the coordinator registry leader +func (cr *CoordinatorRegistry) IsLeader() bool { + cr.leaderMutex.RLock() + defer cr.leaderMutex.RUnlock() + return cr.isLeader +} + +// GetLeaderAddress returns the current leader's address +func (cr *CoordinatorRegistry) GetLeaderAddress() string { + if cr.leaderLock != nil { + return cr.leaderLock.LockOwner() + } + return "" +} + +// WaitForLeader waits for a leader to be elected, with timeout +func (cr *CoordinatorRegistry) WaitForLeader(timeout time.Duration) (string, error) { + // Check if there's already a leader + if leader := cr.GetLeaderAddress(); leader != "" { + return leader, nil + } + + // Check if this instance is the leader + if cr.IsLeader() { + return cr.gatewayAddress, nil + } + + // Wait for leadership change notification + deadline := time.Now().Add(timeout) + for { + select { + case leader := <-cr.leadershipChange: + if leader != "" { + return leader, nil + } + case <-time.After(time.Until(deadline)): + return "", fmt.Errorf("timeout waiting for leader election after %v", timeout) + } + + // Double-check in case we missed a notification + if leader := cr.GetLeaderAddress(); leader != "" { + return leader, nil + } + if cr.IsLeader() { + return cr.gatewayAddress, nil + } + + if time.Now().After(deadline) { + break + } + } + + return "", fmt.Errorf("timeout waiting for leader election after %v", timeout) +} + +// AssignCoordinator assigns a coordinator for a consumer group using a balanced strategy. +// The coordinator is selected deterministically via consistent hashing of the +// consumer group across the set of healthy gateways. This spreads groups evenly +// and avoids hot-spotting on the first requester. +func (cr *CoordinatorRegistry) AssignCoordinator(consumerGroup string, requestingGateway string) (*protocol.CoordinatorAssignment, error) { + if !cr.IsLeader() { + return nil, fmt.Errorf("not the coordinator registry leader") + } + + // First check if requesting gateway is healthy without holding assignments lock + if !cr.isGatewayHealthy(requestingGateway) { + return nil, fmt.Errorf("requesting gateway %s is not healthy", requestingGateway) + } + + // Lock assignments mutex to coordinate file operations + cr.assignmentsMutex.Lock() + defer cr.assignmentsMutex.Unlock() + + // Check if coordinator already assigned by trying to load from file + existing, err := cr.loadCoordinatorAssignment(consumerGroup) + if err == nil && existing != nil { + // Assignment exists, check if coordinator is still healthy + if cr.isGatewayHealthy(existing.CoordinatorAddr) { + glog.V(2).Infof("Consumer group %s already has healthy coordinator %s", consumerGroup, existing.CoordinatorAddr) + return existing, nil + } else { + glog.V(1).Infof("Existing coordinator %s for group %s is unhealthy, reassigning", existing.CoordinatorAddr, consumerGroup) + // Delete the existing assignment file + if delErr := cr.deleteCoordinatorAssignment(consumerGroup); delErr != nil { + glog.Warningf("Failed to delete stale assignment for group %s: %v", consumerGroup, delErr) + } + } + } + + // Choose a balanced coordinator via consistent hashing across healthy gateways + chosenAddr, nodeID, err := cr.chooseCoordinatorAddrForGroup(consumerGroup) + if err != nil { + return nil, err + } + + assignment := &protocol.CoordinatorAssignment{ + ConsumerGroup: consumerGroup, + CoordinatorAddr: chosenAddr, + CoordinatorNodeID: nodeID, + AssignedAt: time.Now(), + LastHeartbeat: time.Now(), + } + + // Persist the new assignment to individual file + if err := cr.saveCoordinatorAssignment(consumerGroup, assignment); err != nil { + return nil, fmt.Errorf("failed to persist coordinator assignment for group %s: %w", consumerGroup, err) + } + + glog.V(1).Infof("Assigned coordinator %s (node %d) for consumer group %s via consistent hashing", chosenAddr, nodeID, consumerGroup) + return assignment, nil +} + +// GetCoordinator returns the coordinator for a consumer group +func (cr *CoordinatorRegistry) GetCoordinator(consumerGroup string) (*protocol.CoordinatorAssignment, error) { + if !cr.IsLeader() { + return nil, fmt.Errorf("not the coordinator registry leader") + } + + // Load assignment directly from file + assignment, err := cr.loadCoordinatorAssignment(consumerGroup) + if err != nil { + return nil, fmt.Errorf("no coordinator assigned for consumer group %s: %w", consumerGroup, err) + } + + return assignment, nil +} + +// RegisterGateway registers a gateway instance +func (cr *CoordinatorRegistry) RegisterGateway(gatewayAddress string) error { + if !cr.IsLeader() { + return fmt.Errorf("not the coordinator registry leader") + } + + cr.registerGateway(gatewayAddress) + return nil +} + +// registerGateway internal method to register a gateway +func (cr *CoordinatorRegistry) registerGateway(gatewayAddress string) { + cr.gatewaysMutex.Lock() + defer cr.gatewaysMutex.Unlock() + + nodeID := generateDeterministicNodeID(gatewayAddress) + + cr.activeGateways[gatewayAddress] = &GatewayInfo{ + Address: gatewayAddress, + NodeID: nodeID, + RegisteredAt: time.Now(), + LastHeartbeat: time.Now(), + IsHealthy: true, + } + + glog.V(1).Infof("Registered gateway %s with deterministic node ID %d", gatewayAddress, nodeID) +} + +// HeartbeatGateway updates the heartbeat for a gateway +func (cr *CoordinatorRegistry) HeartbeatGateway(gatewayAddress string) error { + if !cr.IsLeader() { + return fmt.Errorf("not the coordinator registry leader") + } + + cr.gatewaysMutex.Lock() + + if gateway, exists := cr.activeGateways[gatewayAddress]; exists { + gateway.LastHeartbeat = time.Now() + gateway.IsHealthy = true + cr.gatewaysMutex.Unlock() + glog.V(3).Infof("Updated heartbeat for gateway %s", gatewayAddress) + } else { + // Auto-register unknown gateway - unlock first to avoid double unlock + cr.gatewaysMutex.Unlock() + cr.registerGateway(gatewayAddress) + } + + return nil +} + +// isGatewayHealthy checks if a gateway is healthy +func (cr *CoordinatorRegistry) isGatewayHealthy(gatewayAddress string) bool { + cr.gatewaysMutex.RLock() + defer cr.gatewaysMutex.RUnlock() + + return cr.isGatewayHealthyUnsafe(gatewayAddress) +} + +// isGatewayHealthyUnsafe checks if a gateway is healthy without acquiring locks +// Caller must hold gatewaysMutex.RLock() or gatewaysMutex.Lock() +func (cr *CoordinatorRegistry) isGatewayHealthyUnsafe(gatewayAddress string) bool { + gateway, exists := cr.activeGateways[gatewayAddress] + if !exists { + return false + } + + return gateway.IsHealthy && time.Since(gateway.LastHeartbeat) < GatewayTimeout +} + +// getGatewayNodeID returns the node ID for a gateway +func (cr *CoordinatorRegistry) getGatewayNodeID(gatewayAddress string) int32 { + cr.gatewaysMutex.RLock() + defer cr.gatewaysMutex.RUnlock() + + return cr.getGatewayNodeIDUnsafe(gatewayAddress) +} + +// getGatewayNodeIDUnsafe returns the node ID for a gateway without acquiring locks +// Caller must hold gatewaysMutex.RLock() or gatewaysMutex.Lock() +func (cr *CoordinatorRegistry) getGatewayNodeIDUnsafe(gatewayAddress string) int32 { + if gateway, exists := cr.activeGateways[gatewayAddress]; exists { + return gateway.NodeID + } + + return 1 // Default node ID +} + +// getHealthyGatewaysSorted returns a stable-sorted list of healthy gateway addresses. +func (cr *CoordinatorRegistry) getHealthyGatewaysSorted() []string { + cr.gatewaysMutex.RLock() + defer cr.gatewaysMutex.RUnlock() + + addresses := make([]string, 0, len(cr.activeGateways)) + for addr, info := range cr.activeGateways { + if info.IsHealthy && time.Since(info.LastHeartbeat) < GatewayTimeout { + addresses = append(addresses, addr) + } + } + + sort.Strings(addresses) + return addresses +} + +// chooseCoordinatorAddrForGroup selects a coordinator address using consistent hashing. +func (cr *CoordinatorRegistry) chooseCoordinatorAddrForGroup(consumerGroup string) (string, int32, error) { + healthy := cr.getHealthyGatewaysSorted() + if len(healthy) == 0 { + return "", 0, fmt.Errorf("no healthy gateways available for coordinator assignment") + } + idx := hashStringToIndex(consumerGroup, len(healthy)) + addr := healthy[idx] + return addr, cr.getGatewayNodeID(addr), nil +} + +// hashStringToIndex hashes a string to an index in [0, modulo). +func hashStringToIndex(s string, modulo int) int { + if modulo <= 0 { + return 0 + } + h := fnv.New32a() + _, _ = h.Write([]byte(s)) + return int(h.Sum32() % uint32(modulo)) +} + +// generateDeterministicNodeID generates a stable node ID based on gateway address +func generateDeterministicNodeID(gatewayAddress string) int32 { + h := fnv.New32a() + _, _ = h.Write([]byte(gatewayAddress)) + // Use only positive values and avoid 0 + return int32(h.Sum32()&0x7fffffff) + 1 +} + +// startHeartbeatLoop starts the heartbeat loop for this gateway +func (cr *CoordinatorRegistry) startHeartbeatLoop() { + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + + ticker := time.NewTicker(HeartbeatInterval / 2) // Send heartbeats more frequently than timeout + defer ticker.Stop() + + for { + select { + case <-cr.stopChan: + return + case <-ticker.C: + if cr.IsLeader() { + // Send heartbeat for this gateway to keep it healthy + if err := cr.HeartbeatGateway(cr.gatewayAddress); err != nil { + glog.V(2).Infof("Failed to send heartbeat for gateway %s: %v", cr.gatewayAddress, err) + } + } + } + } + }() +} + +// startCleanupLoop starts the cleanup loop for stale assignments and gateways +func (cr *CoordinatorRegistry) startCleanupLoop() { + cr.wg.Add(1) + go func() { + defer cr.wg.Done() + + ticker := time.NewTicker(HeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-cr.stopChan: + return + case <-ticker.C: + if cr.IsLeader() { + cr.cleanupStaleEntries() + } + } + } + }() +} + +// cleanupStaleEntries removes stale gateways and assignments +func (cr *CoordinatorRegistry) cleanupStaleEntries() { + now := time.Now() + + // First, identify stale gateways + var staleGateways []string + cr.gatewaysMutex.Lock() + for addr, gateway := range cr.activeGateways { + if now.Sub(gateway.LastHeartbeat) > GatewayTimeout { + staleGateways = append(staleGateways, addr) + } + } + // Remove stale gateways + for _, addr := range staleGateways { + glog.V(1).Infof("Removing stale gateway %s", addr) + delete(cr.activeGateways, addr) + } + cr.gatewaysMutex.Unlock() + + // Then, identify assignments with unhealthy coordinators and reassign them + cr.assignmentsMutex.Lock() + defer cr.assignmentsMutex.Unlock() + + // Get list of all consumer groups with assignments + consumerGroups, err := cr.listAllCoordinatorAssignments() + if err != nil { + glog.Warningf("Failed to list coordinator assignments during cleanup: %v", err) + return + } + + for _, group := range consumerGroups { + // Load assignment from file + assignment, err := cr.loadCoordinatorAssignment(group) + if err != nil { + glog.Warningf("Failed to load assignment for group %s during cleanup: %v", group, err) + continue + } + + // Check if coordinator is healthy + if !cr.isGatewayHealthy(assignment.CoordinatorAddr) { + glog.V(1).Infof("Coordinator %s for group %s is unhealthy, attempting reassignment", assignment.CoordinatorAddr, group) + + // Try to reassign to a healthy gateway + newAddr, newNodeID, err := cr.chooseCoordinatorAddrForGroup(group) + if err != nil { + // No healthy gateways available, remove the assignment for now + glog.Warningf("No healthy gateways available for reassignment of group %s, removing assignment", group) + if delErr := cr.deleteCoordinatorAssignment(group); delErr != nil { + glog.Warningf("Failed to delete assignment for group %s: %v", group, delErr) + } + } else if newAddr != assignment.CoordinatorAddr { + // Reassign to the new healthy coordinator + newAssignment := &protocol.CoordinatorAssignment{ + ConsumerGroup: group, + CoordinatorAddr: newAddr, + CoordinatorNodeID: newNodeID, + AssignedAt: time.Now(), + LastHeartbeat: time.Now(), + } + + // Save new assignment to file + if saveErr := cr.saveCoordinatorAssignment(group, newAssignment); saveErr != nil { + glog.Warningf("Failed to save reassignment for group %s: %v", group, saveErr) + } else { + glog.V(0).Infof("Reassigned coordinator for group %s from unhealthy %s to healthy %s", + group, assignment.CoordinatorAddr, newAddr) + } + } + } + } +} + +// GetStats returns registry statistics +func (cr *CoordinatorRegistry) GetStats() map[string]interface{} { + // Read counts separately to avoid holding locks while calling IsLeader() + cr.gatewaysMutex.RLock() + gatewayCount := len(cr.activeGateways) + cr.gatewaysMutex.RUnlock() + + // Count assignments from files + var assignmentCount int + if cr.IsLeader() { + consumerGroups, err := cr.listAllCoordinatorAssignments() + if err != nil { + glog.Warningf("Failed to count coordinator assignments: %v", err) + assignmentCount = -1 // Indicate error + } else { + assignmentCount = len(consumerGroups) + } + } else { + assignmentCount = 0 // Non-leader doesn't track assignments + } + + return map[string]interface{}{ + "is_leader": cr.IsLeader(), + "leader_address": cr.GetLeaderAddress(), + "active_gateways": gatewayCount, + "assignments": assignmentCount, + "gateway_address": cr.gatewayAddress, + } +} + +// Persistence methods for coordinator assignments + +// saveCoordinatorAssignment saves a single coordinator assignment to its individual file +func (cr *CoordinatorRegistry) saveCoordinatorAssignment(consumerGroup string, assignment *protocol.CoordinatorAssignment) error { + if !cr.IsLeader() { + // Only leader should save assignments + return nil + } + + return cr.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Convert assignment to JSON + assignmentData, err := json.Marshal(assignment) + if err != nil { + return fmt.Errorf("failed to marshal assignment for group %s: %w", consumerGroup, err) + } + + // Save to individual file: /topics/kafka/.meta/coordinators/<consumer-group>_assignments.json + fileName := fmt.Sprintf("%s_assignments.json", consumerGroup) + return filer.SaveInsideFiler(client, CoordinatorAssignmentsDir, fileName, assignmentData) + }) +} + +// loadCoordinatorAssignment loads a single coordinator assignment from its individual file +func (cr *CoordinatorRegistry) loadCoordinatorAssignment(consumerGroup string) (*protocol.CoordinatorAssignment, error) { + return cr.loadCoordinatorAssignmentWithClient(consumerGroup, cr.filerClientAccessor) +} + +// loadCoordinatorAssignmentWithClient loads a single coordinator assignment using provided client +func (cr *CoordinatorRegistry) loadCoordinatorAssignmentWithClient(consumerGroup string, clientAccessor *filer_client.FilerClientAccessor) (*protocol.CoordinatorAssignment, error) { + var assignment *protocol.CoordinatorAssignment + + err := clientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Load from individual file: /topics/kafka/.meta/coordinators/<consumer-group>_assignments.json + fileName := fmt.Sprintf("%s_assignments.json", consumerGroup) + data, err := filer.ReadInsideFiler(client, CoordinatorAssignmentsDir, fileName) + if err != nil { + return fmt.Errorf("assignment file not found for group %s: %w", consumerGroup, err) + } + + // Parse JSON + if err := json.Unmarshal(data, &assignment); err != nil { + return fmt.Errorf("failed to unmarshal assignment for group %s: %w", consumerGroup, err) + } + + return nil + }) + + if err != nil { + return nil, err + } + + return assignment, nil +} + +// listAllCoordinatorAssignments lists all coordinator assignment files +func (cr *CoordinatorRegistry) listAllCoordinatorAssignments() ([]string, error) { + var consumerGroups []string + + err := cr.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.ListEntriesRequest{ + Directory: CoordinatorAssignmentsDir, + } + + stream, streamErr := client.ListEntries(context.Background(), request) + if streamErr != nil { + // Directory might not exist yet, that's okay + return nil + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } + return fmt.Errorf("failed to receive entry: %v", recvErr) + } + + // Only include assignment files (ending with _assignments.json) + if resp.Entry != nil && !resp.Entry.IsDirectory && + strings.HasSuffix(resp.Entry.Name, "_assignments.json") { + // Extract consumer group name by removing _assignments.json suffix + consumerGroup := strings.TrimSuffix(resp.Entry.Name, "_assignments.json") + consumerGroups = append(consumerGroups, consumerGroup) + } + } + + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to list coordinator assignments: %w", err) + } + + return consumerGroups, nil +} + +// deleteCoordinatorAssignment removes a coordinator assignment file +func (cr *CoordinatorRegistry) deleteCoordinatorAssignment(consumerGroup string) error { + if !cr.IsLeader() { + return nil + } + + return cr.filerClientAccessor.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + fileName := fmt.Sprintf("%s_assignments.json", consumerGroup) + filePath := fmt.Sprintf("%s/%s", CoordinatorAssignmentsDir, fileName) + + _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ + Directory: CoordinatorAssignmentsDir, + Name: fileName, + }) + + if err != nil { + return fmt.Errorf("failed to delete assignment file %s: %w", filePath, err) + } + + return nil + }) +} + +// ReassignCoordinator manually reassigns a coordinator for a consumer group +// This can be called when a coordinator gateway becomes unavailable +func (cr *CoordinatorRegistry) ReassignCoordinator(consumerGroup string) (*protocol.CoordinatorAssignment, error) { + if !cr.IsLeader() { + return nil, fmt.Errorf("not the coordinator registry leader") + } + + cr.assignmentsMutex.Lock() + defer cr.assignmentsMutex.Unlock() + + // Check if assignment exists by loading from file + existing, err := cr.loadCoordinatorAssignment(consumerGroup) + if err != nil { + return nil, fmt.Errorf("no existing assignment for consumer group %s: %w", consumerGroup, err) + } + + // Choose a new coordinator + newAddr, newNodeID, err := cr.chooseCoordinatorAddrForGroup(consumerGroup) + if err != nil { + return nil, fmt.Errorf("failed to choose new coordinator: %w", err) + } + + // Create new assignment + newAssignment := &protocol.CoordinatorAssignment{ + ConsumerGroup: consumerGroup, + CoordinatorAddr: newAddr, + CoordinatorNodeID: newNodeID, + AssignedAt: time.Now(), + LastHeartbeat: time.Now(), + } + + // Persist the new assignment to individual file + if err := cr.saveCoordinatorAssignment(consumerGroup, newAssignment); err != nil { + return nil, fmt.Errorf("failed to persist coordinator reassignment for group %s: %w", consumerGroup, err) + } + + glog.V(0).Infof("Manually reassigned coordinator for group %s from %s to %s", + consumerGroup, existing.CoordinatorAddr, newAddr) + + return newAssignment, nil +} diff --git a/weed/mq/kafka/gateway/coordinator_registry_test.go b/weed/mq/kafka/gateway/coordinator_registry_test.go new file mode 100644 index 000000000..9ce560cd1 --- /dev/null +++ b/weed/mq/kafka/gateway/coordinator_registry_test.go @@ -0,0 +1,309 @@ +package gateway + +import ( + "testing" + "time" +) + +func TestCoordinatorRegistry_DeterministicNodeID(t *testing.T) { + // Test that node IDs are deterministic and stable + addr1 := "gateway1:9092" + addr2 := "gateway2:9092" + + id1a := generateDeterministicNodeID(addr1) + id1b := generateDeterministicNodeID(addr1) + id2 := generateDeterministicNodeID(addr2) + + if id1a != id1b { + t.Errorf("Node ID should be deterministic: %d != %d", id1a, id1b) + } + + if id1a == id2 { + t.Errorf("Different addresses should have different node IDs: %d == %d", id1a, id2) + } + + if id1a <= 0 || id2 <= 0 { + t.Errorf("Node IDs should be positive: %d, %d", id1a, id2) + } +} + +func TestCoordinatorRegistry_BasicOperations(t *testing.T) { + // Create a test registry without actual filer connection + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, // Simulate being leader for tests + } + + // Test gateway registration + gatewayAddr := "test-gateway:9092" + registry.registerGateway(gatewayAddr) + + if len(registry.activeGateways) != 1 { + t.Errorf("Expected 1 gateway, got %d", len(registry.activeGateways)) + } + + gateway, exists := registry.activeGateways[gatewayAddr] + if !exists { + t.Error("Gateway should be registered") + } + + if gateway.NodeID <= 0 { + t.Errorf("Gateway should have positive node ID, got %d", gateway.NodeID) + } + + // Test gateway health check + if !registry.isGatewayHealthyUnsafe(gatewayAddr) { + t.Error("Newly registered gateway should be healthy") + } + + // Test node ID retrieval + nodeID := registry.getGatewayNodeIDUnsafe(gatewayAddr) + if nodeID != gateway.NodeID { + t.Errorf("Expected node ID %d, got %d", gateway.NodeID, nodeID) + } +} + +func TestCoordinatorRegistry_AssignCoordinator(t *testing.T) { + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, + } + + // Register a gateway + gatewayAddr := "test-gateway:9092" + registry.registerGateway(gatewayAddr) + + // Test coordinator assignment when not leader + registry.isLeader = false + _, err := registry.AssignCoordinator("test-group", gatewayAddr) + if err == nil { + t.Error("Should fail when not leader") + } + + // Test coordinator assignment when leader + // Note: This will panic due to no filer client, but we expect this in unit tests + registry.isLeader = true + func() { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic due to missing filer client") + } + }() + registry.AssignCoordinator("test-group", gatewayAddr) + }() + + // Test getting assignment when not leader + registry.isLeader = false + _, err = registry.GetCoordinator("test-group") + if err == nil { + t.Error("Should fail when not leader") + } +} + +func TestCoordinatorRegistry_HealthyGateways(t *testing.T) { + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, + } + + // Register multiple gateways + gateways := []string{"gateway1:9092", "gateway2:9092", "gateway3:9092"} + for _, addr := range gateways { + registry.registerGateway(addr) + } + + // All should be healthy initially + healthy := registry.getHealthyGatewaysSorted() + if len(healthy) != len(gateways) { + t.Errorf("Expected %d healthy gateways, got %d", len(gateways), len(healthy)) + } + + // Make one gateway stale + registry.activeGateways["gateway2:9092"].LastHeartbeat = time.Now().Add(-2 * GatewayTimeout) + + healthy = registry.getHealthyGatewaysSorted() + if len(healthy) != len(gateways)-1 { + t.Errorf("Expected %d healthy gateways after one became stale, got %d", len(gateways)-1, len(healthy)) + } + + // Check that results are sorted + for i := 1; i < len(healthy); i++ { + if healthy[i-1] >= healthy[i] { + t.Errorf("Healthy gateways should be sorted: %v", healthy) + break + } + } +} + +func TestCoordinatorRegistry_ConsistentHashing(t *testing.T) { + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, + } + + // Register multiple gateways + gateways := []string{"gateway1:9092", "gateway2:9092", "gateway3:9092"} + for _, addr := range gateways { + registry.registerGateway(addr) + } + + // Test that same group always gets same coordinator + group := "test-group" + addr1, nodeID1, err1 := registry.chooseCoordinatorAddrForGroup(group) + addr2, nodeID2, err2 := registry.chooseCoordinatorAddrForGroup(group) + + if err1 != nil || err2 != nil { + t.Errorf("Failed to choose coordinator: %v, %v", err1, err2) + } + + if addr1 != addr2 || nodeID1 != nodeID2 { + t.Errorf("Consistent hashing should return same result: (%s,%d) != (%s,%d)", + addr1, nodeID1, addr2, nodeID2) + } + + // Test that different groups can get different coordinators + groups := []string{"group1", "group2", "group3", "group4", "group5"} + coordinators := make(map[string]bool) + + for _, g := range groups { + addr, _, err := registry.chooseCoordinatorAddrForGroup(g) + if err != nil { + t.Errorf("Failed to choose coordinator for %s: %v", g, err) + } + coordinators[addr] = true + } + + // With multiple groups and gateways, we should see some distribution + // (though not guaranteed due to hashing) + if len(coordinators) == 1 && len(gateways) > 1 { + t.Log("Warning: All groups mapped to same coordinator (possible but unlikely)") + } +} + +func TestCoordinatorRegistry_CleanupStaleEntries(t *testing.T) { + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, + } + + // Register gateways and create assignments + gateway1 := "gateway1:9092" + gateway2 := "gateway2:9092" + + registry.registerGateway(gateway1) + registry.registerGateway(gateway2) + + // Note: In the actual implementation, assignments are stored in filer. + // For this test, we'll skip assignment creation since we don't have a mock filer. + + // Make gateway2 stale + registry.activeGateways[gateway2].LastHeartbeat = time.Now().Add(-2 * GatewayTimeout) + + // Verify gateways are present before cleanup + if _, exists := registry.activeGateways[gateway1]; !exists { + t.Error("Gateway1 should be present before cleanup") + } + if _, exists := registry.activeGateways[gateway2]; !exists { + t.Error("Gateway2 should be present before cleanup") + } + + // Run cleanup - this will panic due to missing filer client, but that's expected + func() { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic due to missing filer client during cleanup") + } + }() + registry.cleanupStaleEntries() + }() + + // Note: Gateway cleanup assertions are skipped since cleanup panics due to missing filer client. + // In real usage, cleanup would remove stale gateways and handle filer-based assignment cleanup. +} + +func TestCoordinatorRegistry_GetStats(t *testing.T) { + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, + } + + // Add some data + registry.registerGateway("gateway1:9092") + registry.registerGateway("gateway2:9092") + + // Note: Assignment creation is skipped since assignments are now stored in filer + + // GetStats will panic when trying to count assignments from filer + func() { + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic due to missing filer client in GetStats") + } + }() + registry.GetStats() + }() + + // Note: Stats verification is skipped since GetStats panics due to missing filer client. + // In real usage, GetStats would return proper counts of gateways and assignments. +} + +func TestCoordinatorRegistry_HeartbeatGateway(t *testing.T) { + registry := &CoordinatorRegistry{ + activeGateways: make(map[string]*GatewayInfo), + gatewayAddress: "test-gateway:9092", + stopChan: make(chan struct{}), + leadershipChange: make(chan string, 10), + isLeader: true, + } + + gatewayAddr := "test-gateway:9092" + + // Test heartbeat for non-existent gateway (should auto-register) + err := registry.HeartbeatGateway(gatewayAddr) + if err != nil { + t.Errorf("Heartbeat should succeed and auto-register: %v", err) + } + + if len(registry.activeGateways) != 1 { + t.Errorf("Gateway should be auto-registered") + } + + // Test heartbeat for existing gateway + originalTime := registry.activeGateways[gatewayAddr].LastHeartbeat + time.Sleep(10 * time.Millisecond) // Ensure time difference + + err = registry.HeartbeatGateway(gatewayAddr) + if err != nil { + t.Errorf("Heartbeat should succeed: %v", err) + } + + newTime := registry.activeGateways[gatewayAddr].LastHeartbeat + if !newTime.After(originalTime) { + t.Error("Heartbeat should update LastHeartbeat time") + } + + // Test heartbeat when not leader + registry.isLeader = false + err = registry.HeartbeatGateway(gatewayAddr) + if err == nil { + t.Error("Heartbeat should fail when not leader") + } +} diff --git a/weed/mq/kafka/gateway/server.go b/weed/mq/kafka/gateway/server.go new file mode 100644 index 000000000..9f4e0c81f --- /dev/null +++ b/weed/mq/kafka/gateway/server.go @@ -0,0 +1,300 @@ +package gateway + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" + "github.com/seaweedfs/seaweedfs/weed/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// resolveAdvertisedAddress resolves the appropriate address to advertise to Kafka clients +// when the server binds to all interfaces (:: or 0.0.0.0) +func resolveAdvertisedAddress() string { + // Try to find a non-loopback interface + interfaces, err := net.Interfaces() + if err != nil { + glog.V(1).Infof("Failed to get network interfaces, using localhost: %v", err) + return "127.0.0.1" + } + + for _, iface := range interfaces { + // Skip loopback and inactive interfaces + if iface.Flags&net.FlagLoopback != 0 || iface.Flags&net.FlagUp == 0 { + continue + } + + addrs, err := iface.Addrs() + if err != nil { + continue + } + + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + // Prefer IPv4 addresses for better Kafka client compatibility + if ipv4 := ipNet.IP.To4(); ipv4 != nil { + return ipv4.String() + } + } + } + } + + // Fallback to localhost if no suitable interface found + glog.V(1).Infof("No non-loopback interface found, using localhost") + return "127.0.0.1" +} + +type Options struct { + Listen string + Masters string // SeaweedFS master servers + FilerGroup string // filer group name (optional) + SchemaRegistryURL string // Schema Registry URL (optional) + DefaultPartitions int32 // Default number of partitions for new topics +} + +type Server struct { + opts Options + ln net.Listener + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + handler *protocol.Handler + coordinatorRegistry *CoordinatorRegistry +} + +func NewServer(opts Options) *Server { + ctx, cancel := context.WithCancel(context.Background()) + + var handler *protocol.Handler + var err error + + // Create SeaweedMQ handler - masters are required for production + if opts.Masters == "" { + glog.Fatalf("SeaweedMQ masters are required for Kafka gateway - provide masters addresses") + } + + // Use the intended listen address as the client host for master registration + clientHost := opts.Listen + if clientHost == "" { + clientHost = "127.0.0.1:9092" // Default Kafka port + } + + handler, err = protocol.NewSeaweedMQBrokerHandler(opts.Masters, opts.FilerGroup, clientHost) + if err != nil { + glog.Fatalf("Failed to create SeaweedMQ handler with masters %s: %v", opts.Masters, err) + } + + glog.V(1).Infof("Created Kafka gateway with SeaweedMQ brokers via masters %s", opts.Masters) + + // Initialize schema management if Schema Registry URL is provided + // Note: This is done lazily on first use if it fails here (e.g., if Schema Registry isn't ready yet) + if opts.SchemaRegistryURL != "" { + schemaConfig := schema.ManagerConfig{ + RegistryURL: opts.SchemaRegistryURL, + } + if err := handler.EnableSchemaManagement(schemaConfig); err != nil { + glog.Warningf("Schema management initialization deferred (Schema Registry may not be ready yet): %v", err) + glog.V(1).Infof("Will retry schema management initialization on first schema-related operation") + // Store schema registry URL for lazy initialization + handler.SetSchemaRegistryURL(opts.SchemaRegistryURL) + } else { + glog.V(1).Infof("Schema management enabled with Schema Registry at %s", opts.SchemaRegistryURL) + } + } + + server := &Server{ + opts: opts, + ctx: ctx, + cancel: cancel, + handler: handler, + } + + return server +} + +// NewTestServerForUnitTests creates a test server with a minimal mock handler for unit tests +// This allows basic gateway functionality testing without requiring SeaweedMQ masters +func NewTestServerForUnitTests(opts Options) *Server { + ctx, cancel := context.WithCancel(context.Background()) + + // Create a minimal handler with mock SeaweedMQ backend + handler := NewMinimalTestHandler() + + return &Server{ + opts: opts, + ctx: ctx, + cancel: cancel, + handler: handler, + } +} + +func (s *Server) Start() error { + ln, err := net.Listen("tcp", s.opts.Listen) + if err != nil { + return err + } + s.ln = ln + + // Get gateway address for coordinator registry + // CRITICAL FIX: Use the actual bound address from listener, not the requested listen address + // This is important when using port 0 (random port) for testing + actualListenAddr := s.ln.Addr().String() + host, port := s.handler.GetAdvertisedAddress(actualListenAddr) + gatewayAddress := fmt.Sprintf("%s:%d", host, port) + glog.V(1).Infof("Kafka gateway listening on %s, advertising as %s in Metadata responses", actualListenAddr, gatewayAddress) + + // Set gateway address in handler for coordinator registry + s.handler.SetGatewayAddress(gatewayAddress) + + // Initialize coordinator registry for distributed coordinator assignment (only if masters are configured) + if s.opts.Masters != "" { + // Parse all masters from the comma-separated list using pb.ServerAddresses + masters := pb.ServerAddresses(s.opts.Masters).ToAddresses() + + grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + + s.coordinatorRegistry = NewCoordinatorRegistry(gatewayAddress, masters, grpcDialOption) + s.handler.SetCoordinatorRegistry(s.coordinatorRegistry) + + // Start coordinator registry + if err := s.coordinatorRegistry.Start(); err != nil { + glog.Errorf("Failed to start coordinator registry: %v", err) + return err + } + + glog.V(1).Infof("Started coordinator registry for gateway %s", gatewayAddress) + } else { + glog.V(1).Infof("No masters configured, skipping coordinator registry setup (test mode)") + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + for { + conn, err := s.ln.Accept() + if err != nil { + select { + case <-s.ctx.Done(): + return + default: + return + } + } + // Simple accept log to trace client connections (useful for JoinGroup debugging) + if conn != nil { + glog.V(1).Infof("accepted conn %s -> %s", conn.RemoteAddr(), conn.LocalAddr()) + } + s.wg.Add(1) + go func(c net.Conn) { + defer s.wg.Done() + if err := s.handler.HandleConn(s.ctx, c); err != nil { + glog.V(1).Infof("handle conn %v: %v", c.RemoteAddr(), err) + } + }(conn) + } + }() + return nil +} + +func (s *Server) Wait() error { + s.wg.Wait() + return nil +} + +func (s *Server) Close() error { + s.cancel() + + // Stop coordinator registry + if s.coordinatorRegistry != nil { + if err := s.coordinatorRegistry.Stop(); err != nil { + glog.Warningf("Error stopping coordinator registry: %v", err) + } + } + + if s.ln != nil { + _ = s.ln.Close() + } + + // Wait for goroutines to finish with a timeout to prevent hanging + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + // Normal shutdown + case <-time.After(5 * time.Second): + // Timeout - force shutdown + glog.Warningf("Server shutdown timed out after 5 seconds, forcing close") + } + + // Close the handler (important for SeaweedMQ mode) + if s.handler != nil { + if err := s.handler.Close(); err != nil { + glog.Warningf("Error closing handler: %v", err) + } + } + + return nil +} + +// Removed registerWithBrokerLeader - no longer needed + +// Addr returns the bound address of the server listener, or empty if not started. +func (s *Server) Addr() string { + if s.ln == nil { + return "" + } + // Normalize to an address reachable by clients + host, port := s.GetListenerAddr() + return net.JoinHostPort(host, strconv.Itoa(port)) +} + +// GetHandler returns the protocol handler (for testing) +func (s *Server) GetHandler() *protocol.Handler { + return s.handler +} + +// GetListenerAddr returns the actual listening address and port +func (s *Server) GetListenerAddr() (string, int) { + if s.ln == nil { + // Return empty values to indicate address not available yet + // The caller should handle this appropriately + return "", 0 + } + + addr := s.ln.Addr().String() + // Parse [::]:port or host:port format - use exact match for kafka-go compatibility + if strings.HasPrefix(addr, "[::]:") { + port := strings.TrimPrefix(addr, "[::]:") + if p, err := strconv.Atoi(port); err == nil { + // Resolve appropriate address when bound to IPv6 all interfaces + return resolveAdvertisedAddress(), p + } + } + + // Handle host:port format + if host, port, err := net.SplitHostPort(addr); err == nil { + if p, err := strconv.Atoi(port); err == nil { + // Resolve appropriate address when bound to all interfaces + if host == "::" || host == "" || host == "0.0.0.0" { + host = resolveAdvertisedAddress() + } + return host, p + } + } + + // This should not happen if the listener was set up correctly + glog.Warningf("Unable to parse listener address: %s", addr) + return "", 0 +} diff --git a/weed/mq/kafka/gateway/test_mock_handler.go b/weed/mq/kafka/gateway/test_mock_handler.go new file mode 100644 index 000000000..4bb0e28b1 --- /dev/null +++ b/weed/mq/kafka/gateway/test_mock_handler.go @@ -0,0 +1,224 @@ +package gateway + +import ( + "context" + "fmt" + "sync" + + "github.com/seaweedfs/seaweedfs/weed/filer_client" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/protocol" + filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + schema_pb "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// mockRecord implements the SMQRecord interface for testing +type mockRecord struct { + key []byte + value []byte + timestamp int64 + offset int64 +} + +func (r *mockRecord) GetKey() []byte { return r.key } +func (r *mockRecord) GetValue() []byte { return r.value } +func (r *mockRecord) GetTimestamp() int64 { return r.timestamp } +func (r *mockRecord) GetOffset() int64 { return r.offset } + +// mockSeaweedMQHandler is a stateful mock for unit testing without real SeaweedMQ +type mockSeaweedMQHandler struct { + mu sync.RWMutex + topics map[string]*integration.KafkaTopicInfo + records map[string]map[int32][]integration.SMQRecord // topic -> partition -> records + offsets map[string]map[int32]int64 // topic -> partition -> next offset +} + +func newMockSeaweedMQHandler() *mockSeaweedMQHandler { + return &mockSeaweedMQHandler{ + topics: make(map[string]*integration.KafkaTopicInfo), + records: make(map[string]map[int32][]integration.SMQRecord), + offsets: make(map[string]map[int32]int64), + } +} + +func (m *mockSeaweedMQHandler) TopicExists(topic string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, exists := m.topics[topic] + return exists +} + +func (m *mockSeaweedMQHandler) ListTopics() []string { + m.mu.RLock() + defer m.mu.RUnlock() + topics := make([]string, 0, len(m.topics)) + for topic := range m.topics { + topics = append(topics, topic) + } + return topics +} + +func (m *mockSeaweedMQHandler) CreateTopic(topic string, partitions int32) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.topics[topic]; exists { + return fmt.Errorf("topic already exists") + } + m.topics[topic] = &integration.KafkaTopicInfo{ + Name: topic, + Partitions: partitions, + } + return nil +} + +func (m *mockSeaweedMQHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.topics[name]; exists { + return fmt.Errorf("topic already exists") + } + m.topics[name] = &integration.KafkaTopicInfo{ + Name: name, + Partitions: partitions, + } + return nil +} + +func (m *mockSeaweedMQHandler) DeleteTopic(topic string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.topics, topic) + return nil +} + +func (m *mockSeaweedMQHandler) GetTopicInfo(topic string) (*integration.KafkaTopicInfo, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + info, exists := m.topics[topic] + return info, exists +} + +func (m *mockSeaweedMQHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Check if topic exists + if _, exists := m.topics[topicName]; !exists { + return 0, fmt.Errorf("topic does not exist: %s", topicName) + } + + // Initialize partition records if needed + if _, exists := m.records[topicName]; !exists { + m.records[topicName] = make(map[int32][]integration.SMQRecord) + m.offsets[topicName] = make(map[int32]int64) + } + + // Get next offset + offset := m.offsets[topicName][partitionID] + m.offsets[topicName][partitionID]++ + + // Store record + record := &mockRecord{ + key: key, + value: value, + offset: offset, + } + m.records[topicName][partitionID] = append(m.records[topicName][partitionID], record) + + return offset, nil +} + +func (m *mockSeaweedMQHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) { + return m.ProduceRecord(topicName, partitionID, key, recordValueBytes) +} + +func (m *mockSeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + // Check if topic exists + if _, exists := m.topics[topic]; !exists { + return nil, fmt.Errorf("topic does not exist: %s", topic) + } + + // Get partition records + partitionRecords, exists := m.records[topic][partition] + if !exists || len(partitionRecords) == 0 { + return []integration.SMQRecord{}, nil + } + + // Find records starting from fromOffset + result := make([]integration.SMQRecord, 0, maxRecords) + for _, record := range partitionRecords { + if record.GetOffset() >= fromOffset { + result = append(result, record) + if len(result) >= maxRecords { + break + } + } + } + + return result, nil +} + +func (m *mockSeaweedMQHandler) GetEarliestOffset(topic string, partition int32) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + // Check if topic exists + if _, exists := m.topics[topic]; !exists { + return 0, fmt.Errorf("topic does not exist: %s", topic) + } + + // Get partition records + partitionRecords, exists := m.records[topic][partition] + if !exists || len(partitionRecords) == 0 { + return 0, nil + } + + return partitionRecords[0].GetOffset(), nil +} + +func (m *mockSeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + // Check if topic exists + if _, exists := m.topics[topic]; !exists { + return 0, fmt.Errorf("topic does not exist: %s", topic) + } + + // Return next offset (latest + 1) + if offsets, exists := m.offsets[topic]; exists { + return offsets[partition], nil + } + + return 0, nil +} + +func (m *mockSeaweedMQHandler) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + return fmt.Errorf("mock handler: not implemented") +} + +func (m *mockSeaweedMQHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) { + // Return a minimal broker client that won't actually connect + return nil, fmt.Errorf("mock handler: per-connection broker client not available in unit test mode") +} + +func (m *mockSeaweedMQHandler) GetFilerClientAccessor() *filer_client.FilerClientAccessor { + return nil +} + +func (m *mockSeaweedMQHandler) GetBrokerAddresses() []string { + return []string{"localhost:9092"} // Return a dummy broker address for unit tests +} + +func (m *mockSeaweedMQHandler) Close() error { return nil } + +func (m *mockSeaweedMQHandler) SetProtocolHandler(h integration.ProtocolHandler) {} + +// NewMinimalTestHandler creates a minimal handler for unit testing +// that won't actually process Kafka protocol requests +func NewMinimalTestHandler() *protocol.Handler { + return protocol.NewTestHandlerWithMock(newMockSeaweedMQHandler()) +} |
