diff options
Diffstat (limited to 'test/kafka/internal/testutil/gateway.go')
| -rw-r--r-- | test/kafka/internal/testutil/gateway.go | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/test/kafka/internal/testutil/gateway.go b/test/kafka/internal/testutil/gateway.go new file mode 100644 index 000000000..8021abcb6 --- /dev/null +++ b/test/kafka/internal/testutil/gateway.go @@ -0,0 +1,220 @@ +package testutil + +import ( + "context" + "fmt" + "net" + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" +) + +// GatewayTestServer wraps the gateway server with common test utilities +type GatewayTestServer struct { + *gateway.Server + t *testing.T +} + +// GatewayOptions contains configuration for test gateway +type GatewayOptions struct { + Listen string + Masters string + UseProduction bool + // Add more options as needed +} + +// NewGatewayTestServer creates a new test gateway server with common setup +func NewGatewayTestServer(t *testing.T, opts GatewayOptions) *GatewayTestServer { + if opts.Listen == "" { + opts.Listen = "127.0.0.1:0" // Use random port by default + } + + // Allow switching to production gateway if requested (requires masters) + var srv *gateway.Server + if opts.UseProduction { + if opts.Masters == "" { + // Fallback to env variable for convenience in CI + if v := os.Getenv("SEAWEEDFS_MASTERS"); v != "" { + opts.Masters = v + } else { + opts.Masters = "localhost:9333" + } + } + srv = gateway.NewServer(gateway.Options{ + Listen: opts.Listen, + Masters: opts.Masters, + }) + } else { + // For unit testing without real SeaweedMQ masters + srv = gateway.NewTestServerForUnitTests(gateway.Options{ + Listen: opts.Listen, + }) + } + + return &GatewayTestServer{ + Server: srv, + t: t, + } +} + +// StartAndWait starts the gateway and waits for it to be ready +func (g *GatewayTestServer) StartAndWait() string { + g.t.Helper() + + // Start server in goroutine + go func() { + // Enable schema mode automatically when SCHEMA_REGISTRY_URL is set + if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" { + h := g.GetHandler() + if h != nil { + _ = h.EnableSchemaManagement(schema.ManagerConfig{RegistryURL: url}) + } + } + if err := g.Start(); err != nil { + g.t.Errorf("Failed to start gateway: %v", err) + } + }() + + // Wait for server to be ready + time.Sleep(100 * time.Millisecond) + + host, port := g.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + g.t.Logf("Gateway running on %s", addr) + + return addr +} + +// AddTestTopic adds a topic for testing with default configuration +func (g *GatewayTestServer) AddTestTopic(name string) { + g.t.Helper() + g.GetHandler().AddTopicForTesting(name, 1) + g.t.Logf("Added test topic: %s", name) +} + +// AddTestTopics adds multiple topics for testing +func (g *GatewayTestServer) AddTestTopics(names ...string) { + g.t.Helper() + for _, name := range names { + g.AddTestTopic(name) + } +} + +// CleanupAndClose properly closes the gateway server +func (g *GatewayTestServer) CleanupAndClose() { + g.t.Helper() + if err := g.Close(); err != nil { + g.t.Errorf("Failed to close gateway: %v", err) + } +} + +// SMQAvailabilityMode indicates whether SeaweedMQ is available for testing +type SMQAvailabilityMode int + +const ( + SMQUnavailable SMQAvailabilityMode = iota // Use mock handler only + SMQAvailable // SMQ is available, can use production mode + SMQRequired // SMQ is required, skip test if unavailable +) + +// CheckSMQAvailability checks if SeaweedFS masters are available for testing +func CheckSMQAvailability() (bool, string) { + masters := os.Getenv("SEAWEEDFS_MASTERS") + if masters == "" { + return false, "" + } + + // Test if at least one master is reachable + if masters != "" { + // Try to connect to the first master to verify availability + conn, err := net.DialTimeout("tcp", masters, 2*time.Second) + if err != nil { + return false, masters // Masters specified but unreachable + } + conn.Close() + return true, masters + } + + return false, "" +} + +// NewGatewayTestServerWithSMQ creates a gateway server that automatically uses SMQ if available +func NewGatewayTestServerWithSMQ(t *testing.T, mode SMQAvailabilityMode) *GatewayTestServer { + smqAvailable, masters := CheckSMQAvailability() + + switch mode { + case SMQRequired: + if !smqAvailable { + if masters != "" { + t.Skipf("Skipping test: SEAWEEDFS_MASTERS=%s specified but unreachable", masters) + } else { + t.Skip("Skipping test: SEAWEEDFS_MASTERS required but not set") + } + } + t.Logf("Using SMQ-backed gateway with masters: %s", masters) + return newGatewayTestServerWithTimeout(t, GatewayOptions{ + UseProduction: true, + Masters: masters, + }, 120*time.Second) + + case SMQAvailable: + if smqAvailable { + t.Logf("SMQ available, using production gateway with masters: %s", masters) + return newGatewayTestServerWithTimeout(t, GatewayOptions{ + UseProduction: true, + Masters: masters, + }, 120*time.Second) + } else { + t.Logf("SMQ not available, using mock gateway") + return NewGatewayTestServer(t, GatewayOptions{}) + } + + default: // SMQUnavailable + t.Logf("Using mock gateway (SMQ integration disabled)") + return NewGatewayTestServer(t, GatewayOptions{}) + } +} + +// newGatewayTestServerWithTimeout creates a gateway server with a timeout to prevent hanging +func newGatewayTestServerWithTimeout(t *testing.T, opts GatewayOptions, timeout time.Duration) *GatewayTestServer { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + done := make(chan *GatewayTestServer, 1) + errChan := make(chan error, 1) + + go func() { + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("panic creating gateway: %v", r) + } + }() + + // Create the gateway in a goroutine so we can timeout if it hangs + t.Logf("Creating gateway with masters: %s (with %v timeout)", opts.Masters, timeout) + gateway := NewGatewayTestServer(t, opts) + t.Logf("Gateway created successfully") + done <- gateway + }() + + select { + case gateway := <-done: + return gateway + case err := <-errChan: + t.Fatalf("Error creating gateway: %v", err) + case <-ctx.Done(): + t.Fatalf("Timeout creating gateway after %v - likely SMQ broker discovery failed. Check if MQ brokers are running and accessible.", timeout) + } + + return nil // This should never be reached +} + +// IsSMQMode returns true if the gateway is using real SMQ backend +// This is determined by checking if we have the SEAWEEDFS_MASTERS environment variable +func (g *GatewayTestServer) IsSMQMode() bool { + available, _ := CheckSMQAvailability() + return available +} |
