diff options
| author | chrislu <chris.lu@gmail.com> | 2025-06-24 09:35:08 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-06-24 09:35:08 -0700 |
| commit | ad6a4ef9b2d147481b92df02a3923ca99b73bf20 (patch) | |
| tree | 01e4f39c9fb394f3141a9e31a9d7940003d66b01 /test/mq/integration | |
| parent | 46602a229cbbe4dd75e286bddf14f9729fbacd40 (diff) | |
| download | seaweedfs-ad6a4ef9b2d147481b92df02a3923ca99b73bf20.tar.xz seaweedfs-ad6a4ef9b2d147481b92df02a3923ca99b73bf20.zip | |
make test-basic
Diffstat (limited to 'test/mq/integration')
| -rw-r--r-- | test/mq/integration/basic_pubsub_test.go | 15 | ||||
| -rw-r--r-- | test/mq/integration/framework.go | 36 |
2 files changed, 40 insertions, 11 deletions
diff --git a/test/mq/integration/basic_pubsub_test.go b/test/mq/integration/basic_pubsub_test.go index ad434e50a..10bf69f7f 100644 --- a/test/mq/integration/basic_pubsub_test.go +++ b/test/mq/integration/basic_pubsub_test.go @@ -16,9 +16,8 @@ func TestBasicPublishSubscribe(t *testing.T) { suite := NewIntegrationTestSuite(t) require.NoError(t, suite.Setup()) - // Test configuration namespace := "test" - topicName := "basic-pubsub" + topicName := fmt.Sprintf("basic-pubsub-%d", time.Now().UnixNano()) // Unique topic name per run testSchema := CreateTestSchema() messageCount := 10 @@ -27,12 +26,12 @@ func TestBasicPublishSubscribe(t *testing.T) { Namespace: namespace, TopicName: topicName, PartitionCount: 1, - PublisherName: "test-publisher", + PublisherName: "basic-publisher", RecordType: testSchema, } publisher, err := suite.CreatePublisher(pubConfig) - require.NoError(t, err, "Failed to create publisher") + require.NoError(t, err) // Create subscriber subConfig := &SubscriberTestConfig{ @@ -51,6 +50,7 @@ func TestBasicPublishSubscribe(t *testing.T) { // Set up message collector collector := NewMessageCollector(messageCount) subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + t.Logf("[Subscriber] Received message with key: %s, ts: %d", string(m.Data.Key), m.Data.TsNs) collector.AddMessage(TestMessage{ ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())), Content: m.Data.Value, @@ -68,6 +68,7 @@ func TestBasicPublishSubscribe(t *testing.T) { }() // Wait for subscriber to be ready + t.Logf("[Test] Waiting for subscriber to be ready...") time.Sleep(2 * time.Second) // Publish test messages @@ -80,12 +81,14 @@ func TestBasicPublishSubscribe(t *testing.T) { RecordEnd() key := []byte(fmt.Sprintf("key-%d", i)) + t.Logf("[Publisher] Publishing message %d with key: %s", i, string(key)) err := publisher.PublishRecord(key, record) require.NoError(t, err, "Failed to publish message %d", i) } - // Wait for messages to be received + t.Logf("[Test] Waiting for messages to be received...") messages := collector.WaitForMessages(30 * time.Second) + t.Logf("[Test] WaitForMessages returned. Received %d messages.", len(messages)) // Verify all messages were received assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages)) @@ -95,6 +98,8 @@ func TestBasicPublishSubscribe(t *testing.T) { assert.NotEmpty(t, msg.Content, "Message %d should have content", i) assert.NotEmpty(t, msg.Key, "Message %d should have key", i) } + + t.Logf("[Test] TestBasicPublishSubscribe completed.") } func TestMultipleConsumers(t *testing.T) { diff --git a/test/mq/integration/framework.go b/test/mq/integration/framework.go index 421df5d9c..47d26e165 100644 --- a/test/mq/integration/framework.go +++ b/test/mq/integration/framework.go @@ -37,6 +37,7 @@ type IntegrationTestSuite struct { agents map[string]*agent.MessageQueueAgent publishers map[string]*pub_client.TopicPublisher subscribers map[string]*sub_client.TopicSubscriber + subCancels map[string]context.CancelFunc cleanupOnce sync.Once t *testing.T } @@ -55,6 +56,7 @@ func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite { agents: make(map[string]*agent.MessageQueueAgent), publishers: make(map[string]*pub_client.TopicPublisher), subscribers: make(map[string]*sub_client.TopicSubscriber), + subCancels: make(map[string]context.CancelFunc), t: t, } } @@ -75,16 +77,34 @@ func (its *IntegrationTestSuite) Setup() error { // Cleanup performs cleanup operations func (its *IntegrationTestSuite) Cleanup() { its.cleanupOnce.Do(func() { - // Close all subscribers (they use context cancellation) - for name, _ := range its.subscribers { + // Close all subscribers first (they use context cancellation) + for name := range its.subscribers { + if cancel, ok := its.subCancels[name]; ok && cancel != nil { + cancel() + its.t.Logf("Cancelled subscriber context: %s", name) + } its.t.Logf("Cleaned up subscriber: %s", name) } + // Wait a moment for gRPC connections to close gracefully + time.Sleep(1 * time.Second) + // Close all publishers for name, publisher := range its.publishers { if publisher != nil { - publisher.Shutdown() - its.t.Logf("Cleaned up publisher: %s", name) + // Add timeout to prevent deadlock during shutdown + done := make(chan bool, 1) + go func(p *pub_client.TopicPublisher, n string) { + p.Shutdown() + done <- true + }(publisher, name) + + select { + case <-done: + its.t.Logf("Cleaned up publisher: %s", name) + case <-time.After(5 * time.Second): + its.t.Logf("Publisher shutdown timed out: %s", name) + } } } @@ -135,8 +155,9 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) } offsetChan := make(chan sub_client.KeyedOffset, 1024) + ctx, cancel := context.WithCancel(context.Background()) subscriber := sub_client.NewTopicSubscriber( - context.Background(), + ctx, its.env.Brokers, subscriberConfig, contentConfig, @@ -144,6 +165,7 @@ func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) ) its.subscribers[config.ConsumerInstanceId] = subscriber + its.subCancels[config.ConsumerInstanceId] = cancel return subscriber, nil } @@ -204,6 +226,7 @@ type MessageCollector struct { mutex sync.RWMutex waitCh chan struct{} expected int + closed bool // protect against closing waitCh multiple times } // NewMessageCollector creates a new message collector @@ -221,8 +244,9 @@ func (mc *MessageCollector) AddMessage(msg TestMessage) { defer mc.mutex.Unlock() mc.messages = append(mc.messages, msg) - if len(mc.messages) >= mc.expected { + if len(mc.messages) >= mc.expected && !mc.closed { close(mc.waitCh) + mc.closed = true } } |
