aboutsummaryrefslogtreecommitdiff
path: root/test/mq/integration
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-23 10:55:02 -0700
committerchrislu <chris.lu@gmail.com>2025-06-23 10:55:02 -0700
commit0347212b6417074cd6727c3981ee5f54f5373206 (patch)
tree826a36445ba497b7afbe767c2989e61bbe37df0b /test/mq/integration
parent7324cb71717f87cd0cc957d983d2ad2e0ca82695 (diff)
downloadseaweedfs-0347212b6417074cd6727c3981ee5f54f5373206.tar.xz
seaweedfs-0347212b6417074cd6727c3981ee5f54f5373206.zip
init version
Diffstat (limited to 'test/mq/integration')
-rw-r--r--test/mq/integration/basic_pubsub_test.go334
-rw-r--r--test/mq/integration/framework.go355
2 files changed, 689 insertions, 0 deletions
diff --git a/test/mq/integration/basic_pubsub_test.go b/test/mq/integration/basic_pubsub_test.go
new file mode 100644
index 000000000..ad434e50a
--- /dev/null
+++ b/test/mq/integration/basic_pubsub_test.go
@@ -0,0 +1,334 @@
+package integration
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestBasicPublishSubscribe(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ // Test configuration
+ namespace := "test"
+ topicName := "basic-pubsub"
+ testSchema := CreateTestSchema()
+ messageCount := 10
+
+ // Create publisher
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 1,
+ PublisherName: "test-publisher",
+ RecordType: testSchema,
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ require.NoError(t, err, "Failed to create publisher")
+
+ // Create subscriber
+ subConfig := &SubscriberTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ ConsumerGroup: "test-group",
+ ConsumerInstanceId: "consumer-1",
+ MaxPartitionCount: 1,
+ SlidingWindowSize: 10,
+ OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
+ }
+
+ subscriber, err := suite.CreateSubscriber(subConfig)
+ require.NoError(t, err, "Failed to create subscriber")
+
+ // Set up message collector
+ collector := NewMessageCollector(messageCount)
+ subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
+ collector.AddMessage(TestMessage{
+ ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())),
+ Content: m.Data.Value,
+ Timestamp: time.Unix(0, m.Data.TsNs),
+ Key: m.Data.Key,
+ })
+ })
+
+ // Start subscriber
+ go func() {
+ err := subscriber.Subscribe()
+ if err != nil {
+ t.Logf("Subscriber error: %v", err)
+ }
+ }()
+
+ // Wait for subscriber to be ready
+ time.Sleep(2 * time.Second)
+
+ // Publish test messages
+ for i := 0; i < messageCount; i++ {
+ record := schema.RecordBegin().
+ SetString("id", fmt.Sprintf("msg-%d", i)).
+ SetInt64("timestamp", time.Now().UnixNano()).
+ SetString("content", fmt.Sprintf("Test message %d", i)).
+ SetInt32("sequence", int32(i)).
+ RecordEnd()
+
+ key := []byte(fmt.Sprintf("key-%d", i))
+ err := publisher.PublishRecord(key, record)
+ require.NoError(t, err, "Failed to publish message %d", i)
+ }
+
+ // Wait for messages to be received
+ messages := collector.WaitForMessages(30 * time.Second)
+
+ // Verify all messages were received
+ assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages))
+
+ // Verify message content
+ for i, msg := range messages {
+ assert.NotEmpty(t, msg.Content, "Message %d should have content", i)
+ assert.NotEmpty(t, msg.Key, "Message %d should have key", i)
+ }
+}
+
+func TestMultipleConsumers(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ namespace := "test"
+ topicName := "multi-consumer"
+ testSchema := CreateTestSchema()
+ messageCount := 20
+ consumerCount := 3
+
+ // Create publisher
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 3, // Multiple partitions for load distribution
+ PublisherName: "multi-publisher",
+ RecordType: testSchema,
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ require.NoError(t, err)
+
+ // Create multiple consumers
+ collectors := make([]*MessageCollector, consumerCount)
+ for i := 0; i < consumerCount; i++ {
+ collectors[i] = NewMessageCollector(messageCount / consumerCount) // Expect roughly equal distribution
+
+ subConfig := &SubscriberTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ ConsumerGroup: "multi-consumer-group", // Same group for load balancing
+ ConsumerInstanceId: fmt.Sprintf("consumer-%d", i),
+ MaxPartitionCount: 1,
+ SlidingWindowSize: 10,
+ OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
+ }
+
+ subscriber, err := suite.CreateSubscriber(subConfig)
+ require.NoError(t, err)
+
+ // Set up message collection for this consumer
+ collectorIndex := i
+ subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
+ collectors[collectorIndex].AddMessage(TestMessage{
+ ID: fmt.Sprintf("consumer-%d-msg-%d", collectorIndex, len(collectors[collectorIndex].GetMessages())),
+ Content: m.Data.Value,
+ Timestamp: time.Unix(0, m.Data.TsNs),
+ Key: m.Data.Key,
+ })
+ })
+
+ // Start subscriber
+ go func() {
+ subscriber.Subscribe()
+ }()
+ }
+
+ // Wait for subscribers to be ready
+ time.Sleep(3 * time.Second)
+
+ // Publish messages with different keys to distribute across partitions
+ for i := 0; i < messageCount; i++ {
+ record := schema.RecordBegin().
+ SetString("id", fmt.Sprintf("multi-msg-%d", i)).
+ SetInt64("timestamp", time.Now().UnixNano()).
+ SetString("content", fmt.Sprintf("Multi consumer test message %d", i)).
+ SetInt32("sequence", int32(i)).
+ RecordEnd()
+
+ key := []byte(fmt.Sprintf("partition-key-%d", i%3)) // Distribute across 3 partitions
+ err := publisher.PublishRecord(key, record)
+ require.NoError(t, err)
+ }
+
+ // Wait for all messages to be consumed
+ time.Sleep(10 * time.Second)
+
+ // Verify message distribution
+ totalReceived := 0
+ for i, collector := range collectors {
+ messages := collector.GetMessages()
+ t.Logf("Consumer %d received %d messages", i, len(messages))
+ totalReceived += len(messages)
+ }
+
+ // All messages should be consumed across all consumers
+ assert.Equal(t, messageCount, totalReceived, "Total messages received should equal messages sent")
+}
+
+func TestMessageOrdering(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ namespace := "test"
+ topicName := "ordering-test"
+ testSchema := CreateTestSchema()
+ messageCount := 15
+
+ // Create publisher
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 1, // Single partition to guarantee ordering
+ PublisherName: "ordering-publisher",
+ RecordType: testSchema,
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ require.NoError(t, err)
+
+ // Create subscriber
+ subConfig := &SubscriberTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ ConsumerGroup: "ordering-group",
+ ConsumerInstanceId: "ordering-consumer",
+ MaxPartitionCount: 1,
+ SlidingWindowSize: 5,
+ OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
+ }
+
+ subscriber, err := suite.CreateSubscriber(subConfig)
+ require.NoError(t, err)
+
+ // Set up message collector
+ collector := NewMessageCollector(messageCount)
+ subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
+ collector.AddMessage(TestMessage{
+ ID: fmt.Sprintf("ordered-msg"),
+ Content: m.Data.Value,
+ Timestamp: time.Unix(0, m.Data.TsNs),
+ Key: m.Data.Key,
+ })
+ })
+
+ // Start subscriber
+ go func() {
+ subscriber.Subscribe()
+ }()
+
+ // Wait for consumer to be ready
+ time.Sleep(2 * time.Second)
+
+ // Publish messages with same key to ensure they go to same partition
+ publishTimes := make([]time.Time, messageCount)
+ for i := 0; i < messageCount; i++ {
+ publishTimes[i] = time.Now()
+
+ record := schema.RecordBegin().
+ SetString("id", fmt.Sprintf("ordered-%d", i)).
+ SetInt64("timestamp", publishTimes[i].UnixNano()).
+ SetString("content", fmt.Sprintf("Ordered message %d", i)).
+ SetInt32("sequence", int32(i)).
+ RecordEnd()
+
+ key := []byte("same-partition-key") // Same key ensures same partition
+ err := publisher.PublishRecord(key, record)
+ require.NoError(t, err)
+
+ // Small delay to ensure different timestamps
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ // Wait for all messages
+ messages := collector.WaitForMessages(30 * time.Second)
+ require.Len(t, messages, messageCount)
+
+ // Verify ordering within the partition
+ suite.AssertMessageOrdering(t, messages)
+}
+
+func TestSchemaValidation(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ namespace := "test"
+ topicName := "schema-validation"
+
+ // Test with simple schema
+ simpleSchema := CreateTestSchema()
+
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 1,
+ PublisherName: "schema-publisher",
+ RecordType: simpleSchema,
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ require.NoError(t, err)
+
+ // Test valid record
+ validRecord := schema.RecordBegin().
+ SetString("id", "valid-msg").
+ SetInt64("timestamp", time.Now().UnixNano()).
+ SetString("content", "Valid message").
+ SetInt32("sequence", 1).
+ RecordEnd()
+
+ err = publisher.PublishRecord([]byte("test-key"), validRecord)
+ assert.NoError(t, err, "Valid record should be published successfully")
+
+ // Test with complex nested schema
+ complexSchema := CreateComplexTestSchema()
+
+ complexPubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName + "-complex",
+ PartitionCount: 1,
+ PublisherName: "complex-publisher",
+ RecordType: complexSchema,
+ }
+
+ complexPublisher, err := suite.CreatePublisher(complexPubConfig)
+ require.NoError(t, err)
+
+ // Test complex nested record
+ complexRecord := schema.RecordBegin().
+ SetString("user_id", "user123").
+ SetString("name", "John Doe").
+ SetInt32("age", 30).
+ SetStringList("emails", "john@example.com", "john.doe@company.com").
+ SetRecord("address",
+ schema.RecordBegin().
+ SetString("street", "123 Main St").
+ SetString("city", "New York").
+ SetString("zipcode", "10001").
+ RecordEnd()).
+ SetInt64("created_at", time.Now().UnixNano()).
+ RecordEnd()
+
+ err = complexPublisher.PublishRecord([]byte("complex-key"), complexRecord)
+ assert.NoError(t, err, "Complex nested record should be published successfully")
+}
diff --git a/test/mq/integration/framework.go b/test/mq/integration/framework.go
new file mode 100644
index 000000000..421df5d9c
--- /dev/null
+++ b/test/mq/integration/framework.go
@@ -0,0 +1,355 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/agent"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+// TestEnvironment holds the configuration for the test environment
+type TestEnvironment struct {
+ Masters []string
+ Brokers []string
+ Filers []string
+ TestTimeout time.Duration
+ CleanupFuncs []func()
+ mutex sync.Mutex
+}
+
+// IntegrationTestSuite provides the base test framework
+type IntegrationTestSuite struct {
+ env *TestEnvironment
+ agents map[string]*agent.MessageQueueAgent
+ publishers map[string]*pub_client.TopicPublisher
+ subscribers map[string]*sub_client.TopicSubscriber
+ cleanupOnce sync.Once
+ t *testing.T
+}
+
+// NewIntegrationTestSuite creates a new test suite instance
+func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite {
+ env := &TestEnvironment{
+ Masters: getEnvList("SEAWEED_MASTERS", []string{"localhost:19333"}),
+ Brokers: getEnvList("SEAWEED_BROKERS", []string{"localhost:17777"}),
+ Filers: getEnvList("SEAWEED_FILERS", []string{"localhost:18888"}),
+ TestTimeout: getEnvDuration("GO_TEST_TIMEOUT", 30*time.Minute),
+ }
+
+ return &IntegrationTestSuite{
+ env: env,
+ agents: make(map[string]*agent.MessageQueueAgent),
+ publishers: make(map[string]*pub_client.TopicPublisher),
+ subscribers: make(map[string]*sub_client.TopicSubscriber),
+ t: t,
+ }
+}
+
+// Setup initializes the test environment
+func (its *IntegrationTestSuite) Setup() error {
+ // Wait for cluster to be ready
+ if err := its.waitForClusterReady(); err != nil {
+ return fmt.Errorf("cluster not ready: %v", err)
+ }
+
+ // Register cleanup
+ its.t.Cleanup(its.Cleanup)
+
+ return nil
+}
+
+// Cleanup performs cleanup operations
+func (its *IntegrationTestSuite) Cleanup() {
+ its.cleanupOnce.Do(func() {
+ // Close all subscribers (they use context cancellation)
+ for name, _ := range its.subscribers {
+ its.t.Logf("Cleaned up subscriber: %s", name)
+ }
+
+ // Close all publishers
+ for name, publisher := range its.publishers {
+ if publisher != nil {
+ publisher.Shutdown()
+ its.t.Logf("Cleaned up publisher: %s", name)
+ }
+ }
+
+ // Execute additional cleanup functions
+ its.env.mutex.Lock()
+ for _, cleanup := range its.env.CleanupFuncs {
+ cleanup()
+ }
+ its.env.mutex.Unlock()
+ })
+}
+
+// CreatePublisher creates a new topic publisher
+func (its *IntegrationTestSuite) CreatePublisher(config *PublisherTestConfig) (*pub_client.TopicPublisher, error) {
+ publisherConfig := &pub_client.PublisherConfiguration{
+ Topic: topic.NewTopic(config.Namespace, config.TopicName),
+ PartitionCount: config.PartitionCount,
+ Brokers: its.env.Brokers,
+ PublisherName: config.PublisherName,
+ RecordType: config.RecordType,
+ }
+
+ publisher, err := pub_client.NewTopicPublisher(publisherConfig)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create publisher: %v", err)
+ }
+
+ its.publishers[config.PublisherName] = publisher
+ return publisher, nil
+}
+
+// CreateSubscriber creates a new topic subscriber
+func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) (*sub_client.TopicSubscriber, error) {
+ subscriberConfig := &sub_client.SubscriberConfiguration{
+ ConsumerGroup: config.ConsumerGroup,
+ ConsumerGroupInstanceId: config.ConsumerInstanceId,
+ GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ MaxPartitionCount: config.MaxPartitionCount,
+ SlidingWindowSize: config.SlidingWindowSize,
+ }
+
+ contentConfig := &sub_client.ContentConfiguration{
+ Topic: topic.NewTopic(config.Namespace, config.TopicName),
+ Filter: config.Filter,
+ PartitionOffsets: config.PartitionOffsets,
+ OffsetType: config.OffsetType,
+ OffsetTsNs: config.OffsetTsNs,
+ }
+
+ offsetChan := make(chan sub_client.KeyedOffset, 1024)
+ subscriber := sub_client.NewTopicSubscriber(
+ context.Background(),
+ its.env.Brokers,
+ subscriberConfig,
+ contentConfig,
+ offsetChan,
+ )
+
+ its.subscribers[config.ConsumerInstanceId] = subscriber
+ return subscriber, nil
+}
+
+// CreateAgent creates a new message queue agent
+func (its *IntegrationTestSuite) CreateAgent(name string) (*agent.MessageQueueAgent, error) {
+ var brokerAddresses []pb.ServerAddress
+ for _, broker := range its.env.Brokers {
+ brokerAddresses = append(brokerAddresses, pb.ServerAddress(broker))
+ }
+
+ agentOptions := &agent.MessageQueueAgentOptions{
+ SeedBrokers: brokerAddresses,
+ }
+
+ mqAgent := agent.NewMessageQueueAgent(
+ agentOptions,
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ )
+
+ its.agents[name] = mqAgent
+ return mqAgent, nil
+}
+
+// PublisherTestConfig holds configuration for creating test publishers
+type PublisherTestConfig struct {
+ Namespace string
+ TopicName string
+ PartitionCount int32
+ PublisherName string
+ RecordType *schema_pb.RecordType
+}
+
+// SubscriberTestConfig holds configuration for creating test subscribers
+type SubscriberTestConfig struct {
+ Namespace string
+ TopicName string
+ ConsumerGroup string
+ ConsumerInstanceId string
+ MaxPartitionCount int32
+ SlidingWindowSize int32
+ Filter string
+ PartitionOffsets []*schema_pb.PartitionOffset
+ OffsetType schema_pb.OffsetType
+ OffsetTsNs int64
+}
+
+// TestMessage represents a test message with metadata
+type TestMessage struct {
+ ID string
+ Content []byte
+ Timestamp time.Time
+ Key []byte
+}
+
+// MessageCollector collects received messages for verification
+type MessageCollector struct {
+ messages []TestMessage
+ mutex sync.RWMutex
+ waitCh chan struct{}
+ expected int
+}
+
+// NewMessageCollector creates a new message collector
+func NewMessageCollector(expectedCount int) *MessageCollector {
+ return &MessageCollector{
+ messages: make([]TestMessage, 0),
+ waitCh: make(chan struct{}),
+ expected: expectedCount,
+ }
+}
+
+// AddMessage adds a received message to the collector
+func (mc *MessageCollector) AddMessage(msg TestMessage) {
+ mc.mutex.Lock()
+ defer mc.mutex.Unlock()
+
+ mc.messages = append(mc.messages, msg)
+ if len(mc.messages) >= mc.expected {
+ close(mc.waitCh)
+ }
+}
+
+// WaitForMessages waits for the expected number of messages or timeout
+func (mc *MessageCollector) WaitForMessages(timeout time.Duration) []TestMessage {
+ select {
+ case <-mc.waitCh:
+ case <-time.After(timeout):
+ }
+
+ mc.mutex.RLock()
+ defer mc.mutex.RUnlock()
+
+ result := make([]TestMessage, len(mc.messages))
+ copy(result, mc.messages)
+ return result
+}
+
+// GetMessages returns all collected messages
+func (mc *MessageCollector) GetMessages() []TestMessage {
+ mc.mutex.RLock()
+ defer mc.mutex.RUnlock()
+
+ result := make([]TestMessage, len(mc.messages))
+ copy(result, mc.messages)
+ return result
+}
+
+// CreateTestSchema creates a simple test schema
+func CreateTestSchema() *schema_pb.RecordType {
+ return schema.RecordTypeBegin().
+ WithField("id", schema.TypeString).
+ WithField("timestamp", schema.TypeInt64).
+ WithField("content", schema.TypeString).
+ WithField("sequence", schema.TypeInt32).
+ RecordTypeEnd()
+}
+
+// CreateComplexTestSchema creates a complex test schema with nested structures
+func CreateComplexTestSchema() *schema_pb.RecordType {
+ addressType := schema.RecordTypeBegin().
+ WithField("street", schema.TypeString).
+ WithField("city", schema.TypeString).
+ WithField("zipcode", schema.TypeString).
+ RecordTypeEnd()
+
+ return schema.RecordTypeBegin().
+ WithField("user_id", schema.TypeString).
+ WithField("name", schema.TypeString).
+ WithField("age", schema.TypeInt32).
+ WithField("emails", schema.ListOf(schema.TypeString)).
+ WithRecordField("address", addressType).
+ WithField("created_at", schema.TypeInt64).
+ RecordTypeEnd()
+}
+
+// Helper functions
+
+func getEnvList(key string, defaultValue []string) []string {
+ value := os.Getenv(key)
+ if value == "" {
+ return defaultValue
+ }
+ return strings.Split(value, ",")
+}
+
+func getEnvDuration(key string, defaultValue time.Duration) time.Duration {
+ value := os.Getenv(key)
+ if value == "" {
+ return defaultValue
+ }
+
+ duration, err := time.ParseDuration(value)
+ if err != nil {
+ return defaultValue
+ }
+ return duration
+}
+
+func (its *IntegrationTestSuite) waitForClusterReady() error {
+ maxRetries := 30
+ retryInterval := 2 * time.Second
+
+ for i := 0; i < maxRetries; i++ {
+ if its.isClusterReady() {
+ return nil
+ }
+ its.t.Logf("Waiting for cluster to be ready... attempt %d/%d", i+1, maxRetries)
+ time.Sleep(retryInterval)
+ }
+
+ return fmt.Errorf("cluster not ready after %d attempts", maxRetries)
+}
+
+func (its *IntegrationTestSuite) isClusterReady() bool {
+ // Check if at least one broker is accessible
+ for _, broker := range its.env.Brokers {
+ if its.isBrokerReady(broker) {
+ return true
+ }
+ }
+ return false
+}
+
+func (its *IntegrationTestSuite) isBrokerReady(broker string) bool {
+ // Simple connection test
+ conn, err := grpc.NewClient(broker, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ return false
+ }
+ defer conn.Close()
+
+ // TODO: Add actual health check call here
+ return true
+}
+
+// AssertMessagesReceived verifies that expected messages were received
+func (its *IntegrationTestSuite) AssertMessagesReceived(t *testing.T, collector *MessageCollector, expectedCount int, timeout time.Duration) {
+ messages := collector.WaitForMessages(timeout)
+ require.Len(t, messages, expectedCount, "Expected %d messages, got %d", expectedCount, len(messages))
+}
+
+// AssertMessageOrdering verifies that messages are received in the expected order
+func (its *IntegrationTestSuite) AssertMessageOrdering(t *testing.T, messages []TestMessage) {
+ for i := 1; i < len(messages); i++ {
+ require.True(t, messages[i].Timestamp.After(messages[i-1].Timestamp) || messages[i].Timestamp.Equal(messages[i-1].Timestamp),
+ "Messages not in chronological order: message %d timestamp %v should be >= message %d timestamp %v",
+ i, messages[i].Timestamp, i-1, messages[i-1].Timestamp)
+ }
+}