1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
|
package testutil
import (
"context"
"fmt"
"net"
"os"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
)
// GatewayTestServer wraps the gateway server with common test utilities
type GatewayTestServer struct {
*gateway.Server
t *testing.T
}
// GatewayOptions contains configuration for test gateway
type GatewayOptions struct {
Listen string
Masters string
UseProduction bool
// Add more options as needed
}
// NewGatewayTestServer creates a new test gateway server with common setup
func NewGatewayTestServer(t *testing.T, opts GatewayOptions) *GatewayTestServer {
if opts.Listen == "" {
opts.Listen = "127.0.0.1:0" // Use random port by default
}
// Allow switching to production gateway if requested (requires masters)
var srv *gateway.Server
if opts.UseProduction {
if opts.Masters == "" {
// Fallback to env variable for convenience in CI
if v := os.Getenv("SEAWEEDFS_MASTERS"); v != "" {
opts.Masters = v
} else {
opts.Masters = "localhost:9333"
}
}
srv = gateway.NewServer(gateway.Options{
Listen: opts.Listen,
Masters: opts.Masters,
})
} else {
// For unit testing without real SeaweedMQ masters
srv = gateway.NewTestServerForUnitTests(gateway.Options{
Listen: opts.Listen,
})
}
return &GatewayTestServer{
Server: srv,
t: t,
}
}
// StartAndWait starts the gateway and waits for it to be ready
func (g *GatewayTestServer) StartAndWait() string {
g.t.Helper()
// Start server in goroutine
go func() {
// Enable schema mode automatically when SCHEMA_REGISTRY_URL is set
if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
h := g.GetHandler()
if h != nil {
_ = h.EnableSchemaManagement(schema.ManagerConfig{RegistryURL: url})
}
}
if err := g.Start(); err != nil {
g.t.Errorf("Failed to start gateway: %v", err)
}
}()
// Wait for server to be ready
time.Sleep(100 * time.Millisecond)
host, port := g.GetListenerAddr()
addr := fmt.Sprintf("%s:%d", host, port)
g.t.Logf("Gateway running on %s", addr)
return addr
}
// AddTestTopic adds a topic for testing with default configuration
func (g *GatewayTestServer) AddTestTopic(name string) {
g.t.Helper()
g.GetHandler().AddTopicForTesting(name, 1)
g.t.Logf("Added test topic: %s", name)
}
// AddTestTopics adds multiple topics for testing
func (g *GatewayTestServer) AddTestTopics(names ...string) {
g.t.Helper()
for _, name := range names {
g.AddTestTopic(name)
}
}
// CleanupAndClose properly closes the gateway server
func (g *GatewayTestServer) CleanupAndClose() {
g.t.Helper()
if err := g.Close(); err != nil {
g.t.Errorf("Failed to close gateway: %v", err)
}
}
// SMQAvailabilityMode indicates whether SeaweedMQ is available for testing
type SMQAvailabilityMode int
const (
SMQUnavailable SMQAvailabilityMode = iota // Use mock handler only
SMQAvailable // SMQ is available, can use production mode
SMQRequired // SMQ is required, skip test if unavailable
)
// CheckSMQAvailability checks if SeaweedFS masters are available for testing
func CheckSMQAvailability() (bool, string) {
masters := os.Getenv("SEAWEEDFS_MASTERS")
if masters == "" {
return false, ""
}
// Test if at least one master is reachable
if masters != "" {
// Try to connect to the first master to verify availability
conn, err := net.DialTimeout("tcp", masters, 2*time.Second)
if err != nil {
return false, masters // Masters specified but unreachable
}
conn.Close()
return true, masters
}
return false, ""
}
// NewGatewayTestServerWithSMQ creates a gateway server that automatically uses SMQ if available
func NewGatewayTestServerWithSMQ(t *testing.T, mode SMQAvailabilityMode) *GatewayTestServer {
smqAvailable, masters := CheckSMQAvailability()
switch mode {
case SMQRequired:
if !smqAvailable {
if masters != "" {
t.Skipf("Skipping test: SEAWEEDFS_MASTERS=%s specified but unreachable", masters)
} else {
t.Skip("Skipping test: SEAWEEDFS_MASTERS required but not set")
}
}
t.Logf("Using SMQ-backed gateway with masters: %s", masters)
return newGatewayTestServerWithTimeout(t, GatewayOptions{
UseProduction: true,
Masters: masters,
}, 120*time.Second)
case SMQAvailable:
if smqAvailable {
t.Logf("SMQ available, using production gateway with masters: %s", masters)
return newGatewayTestServerWithTimeout(t, GatewayOptions{
UseProduction: true,
Masters: masters,
}, 120*time.Second)
} else {
t.Logf("SMQ not available, using mock gateway")
return NewGatewayTestServer(t, GatewayOptions{})
}
default: // SMQUnavailable
t.Logf("Using mock gateway (SMQ integration disabled)")
return NewGatewayTestServer(t, GatewayOptions{})
}
}
// newGatewayTestServerWithTimeout creates a gateway server with a timeout to prevent hanging
func newGatewayTestServerWithTimeout(t *testing.T, opts GatewayOptions, timeout time.Duration) *GatewayTestServer {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
done := make(chan *GatewayTestServer, 1)
errChan := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
errChan <- fmt.Errorf("panic creating gateway: %v", r)
}
}()
// Create the gateway in a goroutine so we can timeout if it hangs
t.Logf("Creating gateway with masters: %s (with %v timeout)", opts.Masters, timeout)
gateway := NewGatewayTestServer(t, opts)
t.Logf("Gateway created successfully")
done <- gateway
}()
select {
case gateway := <-done:
return gateway
case err := <-errChan:
t.Fatalf("Error creating gateway: %v", err)
case <-ctx.Done():
t.Fatalf("Timeout creating gateway after %v - likely SMQ broker discovery failed. Check if MQ brokers are running and accessible.", timeout)
}
return nil // This should never be reached
}
// IsSMQMode returns true if the gateway is using real SMQ backend
// This is determined by checking if we have the SEAWEEDFS_MASTERS environment variable
func (g *GatewayTestServer) IsSMQMode() bool {
available, _ := CheckSMQAvailability()
return available
}
|