aboutsummaryrefslogtreecommitdiff
path: root/test/mq/integration
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-24 09:35:08 -0700
committerchrislu <chris.lu@gmail.com>2025-06-24 09:35:08 -0700
commitad6a4ef9b2d147481b92df02a3923ca99b73bf20 (patch)
tree01e4f39c9fb394f3141a9e31a9d7940003d66b01 /test/mq/integration
parent46602a229cbbe4dd75e286bddf14f9729fbacd40 (diff)
downloadseaweedfs-ad6a4ef9b2d147481b92df02a3923ca99b73bf20.tar.xz
seaweedfs-ad6a4ef9b2d147481b92df02a3923ca99b73bf20.zip
make test-basic
Diffstat (limited to 'test/mq/integration')
-rw-r--r--test/mq/integration/basic_pubsub_test.go15
-rw-r--r--test/mq/integration/framework.go36
2 files changed, 40 insertions, 11 deletions
diff --git a/test/mq/integration/basic_pubsub_test.go b/test/mq/integration/basic_pubsub_test.go
index ad434e50a..10bf69f7f 100644
--- a/test/mq/integration/basic_pubsub_test.go
+++ b/test/mq/integration/basic_pubsub_test.go
@@ -16,9 +16,8 @@ func TestBasicPublishSubscribe(t *testing.T) {
suite := NewIntegrationTestSuite(t)
require.NoError(t, suite.Setup())
- // Test configuration
namespace := "test"
- topicName := "basic-pubsub"
+ topicName := fmt.Sprintf("basic-pubsub-%d", time.Now().UnixNano()) // Unique topic name per run
testSchema := CreateTestSchema()
messageCount := 10
@@ -27,12 +26,12 @@ func TestBasicPublishSubscribe(t *testing.T) {
Namespace: namespace,
TopicName: topicName,
PartitionCount: 1,
- PublisherName: "test-publisher",
+ PublisherName: "basic-publisher",
RecordType: testSchema,
}
publisher, err := suite.CreatePublisher(pubConfig)
- require.NoError(t, err, "Failed to create publisher")
+ require.NoError(t, err)
// Create subscriber
subConfig := &SubscriberTestConfig{
@@ -51,6 +50,7 @@ func TestBasicPublishSubscribe(t *testing.T) {
// Set up message collector
collector := NewMessageCollector(messageCount)
subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
+ t.Logf("[Subscriber] Received message with key: %s, ts: %d", string(m.Data.Key), m.Data.TsNs)
collector.AddMessage(TestMessage{
ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())),
Content: m.Data.Value,
@@ -68,6 +68,7 @@ func TestBasicPublishSubscribe(t *testing.T) {
}()
// Wait for subscriber to be ready
+ t.Logf("[Test] Waiting for subscriber to be ready...")
time.Sleep(2 * time.Second)
// Publish test messages
@@ -80,12 +81,14 @@ func TestBasicPublishSubscribe(t *testing.T) {
RecordEnd()
key := []byte(fmt.Sprintf("key-%d", i))
+ t.Logf("[Publisher] Publishing message %d with key: %s", i, string(key))
err := publisher.PublishRecord(key, record)
require.NoError(t, err, "Failed to publish message %d", i)
}
- // Wait for messages to be received
+ t.Logf("[Test] Waiting for messages to be received...")
messages := collector.WaitForMessages(30 * time.Second)
+ t.Logf("[Test] WaitForMessages returned. Received %d messages.", len(messages))
// Verify all messages were received
assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages))
@@ -95,6 +98,8 @@ func TestBasicPublishSubscribe(t *testing.T) {
assert.NotEmpty(t, msg.Content, "Message %d should have content", i)
assert.NotEmpty(t, msg.Key, "Message %d should have key", i)
}
+
+ t.Logf("[Test] TestBasicPublishSubscribe completed.")
}
func TestMultipleConsumers(t *testing.T) {
diff --git a/test/mq/integration/framework.go b/test/mq/integration/framework.go
index 421df5d9c..47d26e165 100644
--- a/test/mq/integration/framework.go
+++ b/test/mq/integration/framework.go
@@ -37,6 +37,7 @@ type IntegrationTestSuite struct {
agents map[string]*agent.MessageQueueAgent
publishers map[string]*pub_client.TopicPublisher
subscribers map[string]*sub_client.TopicSubscriber
+ subCancels map[string]context.CancelFunc
cleanupOnce sync.Once
t *testing.T
}
@@ -55,6 +56,7 @@ func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite {
agents: make(map[string]*agent.MessageQueueAgent),
publishers: make(map[string]*pub_client.TopicPublisher),
subscribers: make(map[string]*sub_client.TopicSubscriber),
+ subCancels: make(map[string]context.CancelFunc),
t: t,
}
}
@@ -75,16 +77,34 @@ func (its *IntegrationTestSuite) Setup() error {
// Cleanup performs cleanup operations
func (its *IntegrationTestSuite) Cleanup() {
its.cleanupOnce.Do(func() {
- // Close all subscribers (they use context cancellation)
- for name, _ := range its.subscribers {
+ // Close all subscribers first (they use context cancellation)
+ for name := range its.subscribers {
+ if cancel, ok := its.subCancels[name]; ok && cancel != nil {
+ cancel()
+ its.t.Logf("Cancelled subscriber context: %s", name)
+ }
its.t.Logf("Cleaned up subscriber: %s", name)
}
+ // Wait a moment for gRPC connections to close gracefully
+ time.Sleep(1 * time.Second)
+
// Close all publishers
for name, publisher := range its.publishers {
if publisher != nil {
- publisher.Shutdown()
- its.t.Logf("Cleaned up publisher: %s", name)
+ // Add timeout to prevent deadlock during shutdown
+ done := make(chan bool, 1)
+ go func(p *pub_client.TopicPublisher, n string) {
+ p.Shutdown()
+ done <- true
+ }(publisher, name)
+
+ select {
+ case <-done:
+ its.t.Logf("Cleaned up publisher: %s", name)
+ case <-time.After(5 * time.Second):
+ its.t.Logf("Publisher shutdown timed out: %s", name)
+ }
}
}
@@ -135,8 +155,9 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig)
}
offsetChan := make(chan sub_client.KeyedOffset, 1024)
+ ctx, cancel := context.WithCancel(context.Background())
subscriber := sub_client.NewTopicSubscriber(
- context.Background(),
+ ctx,
its.env.Brokers,
subscriberConfig,
contentConfig,
@@ -144,6 +165,7 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig)
)
its.subscribers[config.ConsumerInstanceId] = subscriber
+ its.subCancels[config.ConsumerInstanceId] = cancel
return subscriber, nil
}
@@ -204,6 +226,7 @@ type MessageCollector struct {
mutex sync.RWMutex
waitCh chan struct{}
expected int
+ closed bool // protect against closing waitCh multiple times
}
// NewMessageCollector creates a new message collector
@@ -221,8 +244,9 @@ func (mc *MessageCollector) AddMessage(msg TestMessage) {
defer mc.mutex.Unlock()
mc.messages = append(mc.messages, msg)
- if len(mc.messages) >= mc.expected {
+ if len(mc.messages) >= mc.expected && !mc.closed {
close(mc.waitCh)
+ mc.closed = true
}
}