diff options
| author | chrislu <chris.lu@gmail.com> | 2025-06-23 10:55:02 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-06-23 10:55:02 -0700 |
| commit | 0347212b6417074cd6727c3981ee5f54f5373206 (patch) | |
| tree | 826a36445ba497b7afbe767c2989e61bbe37df0b /test/mq/integration | |
| parent | 7324cb71717f87cd0cc957d983d2ad2e0ca82695 (diff) | |
| download | seaweedfs-0347212b6417074cd6727c3981ee5f54f5373206.tar.xz seaweedfs-0347212b6417074cd6727c3981ee5f54f5373206.zip | |
init version
Diffstat (limited to 'test/mq/integration')
| -rw-r--r-- | test/mq/integration/basic_pubsub_test.go | 334 | ||||
| -rw-r--r-- | test/mq/integration/framework.go | 355 |
2 files changed, 689 insertions, 0 deletions
diff --git a/test/mq/integration/basic_pubsub_test.go b/test/mq/integration/basic_pubsub_test.go new file mode 100644 index 000000000..ad434e50a --- /dev/null +++ b/test/mq/integration/basic_pubsub_test.go @@ -0,0 +1,334 @@ +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBasicPublishSubscribe(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + // Test configuration + namespace := "test" + topicName := "basic-pubsub" + testSchema := CreateTestSchema() + messageCount := 10 + + // Create publisher + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 1, + PublisherName: "test-publisher", + RecordType: testSchema, + } + + publisher, err := suite.CreatePublisher(pubConfig) + require.NoError(t, err, "Failed to create publisher") + + // Create subscriber + subConfig := &SubscriberTestConfig{ + Namespace: namespace, + TopicName: topicName, + ConsumerGroup: "test-group", + ConsumerInstanceId: "consumer-1", + MaxPartitionCount: 1, + SlidingWindowSize: 10, + OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + } + + subscriber, err := suite.CreateSubscriber(subConfig) + require.NoError(t, err, "Failed to create subscriber") + + // Set up message collector + collector := NewMessageCollector(messageCount) + subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + collector.AddMessage(TestMessage{ + ID: fmt.Sprintf("msg-%d", len(collector.GetMessages())), + Content: m.Data.Value, + Timestamp: time.Unix(0, m.Data.TsNs), + Key: m.Data.Key, + }) + }) + + // Start subscriber + go func() { + err := subscriber.Subscribe() + if err != nil { + t.Logf("Subscriber error: %v", err) + } + }() + + // Wait for subscriber to be ready + time.Sleep(2 * time.Second) + + // Publish test messages + for i := 0; i < messageCount; i++ { + record := schema.RecordBegin(). + SetString("id", fmt.Sprintf("msg-%d", i)). + SetInt64("timestamp", time.Now().UnixNano()). + SetString("content", fmt.Sprintf("Test message %d", i)). + SetInt32("sequence", int32(i)). + RecordEnd() + + key := []byte(fmt.Sprintf("key-%d", i)) + err := publisher.PublishRecord(key, record) + require.NoError(t, err, "Failed to publish message %d", i) + } + + // Wait for messages to be received + messages := collector.WaitForMessages(30 * time.Second) + + // Verify all messages were received + assert.Len(t, messages, messageCount, "Expected %d messages, got %d", messageCount, len(messages)) + + // Verify message content + for i, msg := range messages { + assert.NotEmpty(t, msg.Content, "Message %d should have content", i) + assert.NotEmpty(t, msg.Key, "Message %d should have key", i) + } +} + +func TestMultipleConsumers(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + namespace := "test" + topicName := "multi-consumer" + testSchema := CreateTestSchema() + messageCount := 20 + consumerCount := 3 + + // Create publisher + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 3, // Multiple partitions for load distribution + PublisherName: "multi-publisher", + RecordType: testSchema, + } + + publisher, err := suite.CreatePublisher(pubConfig) + require.NoError(t, err) + + // Create multiple consumers + collectors := make([]*MessageCollector, consumerCount) + for i := 0; i < consumerCount; i++ { + collectors[i] = NewMessageCollector(messageCount / consumerCount) // Expect roughly equal distribution + + subConfig := &SubscriberTestConfig{ + Namespace: namespace, + TopicName: topicName, + ConsumerGroup: "multi-consumer-group", // Same group for load balancing + ConsumerInstanceId: fmt.Sprintf("consumer-%d", i), + MaxPartitionCount: 1, + SlidingWindowSize: 10, + OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + } + + subscriber, err := suite.CreateSubscriber(subConfig) + require.NoError(t, err) + + // Set up message collection for this consumer + collectorIndex := i + subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + collectors[collectorIndex].AddMessage(TestMessage{ + ID: fmt.Sprintf("consumer-%d-msg-%d", collectorIndex, len(collectors[collectorIndex].GetMessages())), + Content: m.Data.Value, + Timestamp: time.Unix(0, m.Data.TsNs), + Key: m.Data.Key, + }) + }) + + // Start subscriber + go func() { + subscriber.Subscribe() + }() + } + + // Wait for subscribers to be ready + time.Sleep(3 * time.Second) + + // Publish messages with different keys to distribute across partitions + for i := 0; i < messageCount; i++ { + record := schema.RecordBegin(). + SetString("id", fmt.Sprintf("multi-msg-%d", i)). + SetInt64("timestamp", time.Now().UnixNano()). + SetString("content", fmt.Sprintf("Multi consumer test message %d", i)). + SetInt32("sequence", int32(i)). + RecordEnd() + + key := []byte(fmt.Sprintf("partition-key-%d", i%3)) // Distribute across 3 partitions + err := publisher.PublishRecord(key, record) + require.NoError(t, err) + } + + // Wait for all messages to be consumed + time.Sleep(10 * time.Second) + + // Verify message distribution + totalReceived := 0 + for i, collector := range collectors { + messages := collector.GetMessages() + t.Logf("Consumer %d received %d messages", i, len(messages)) + totalReceived += len(messages) + } + + // All messages should be consumed across all consumers + assert.Equal(t, messageCount, totalReceived, "Total messages received should equal messages sent") +} + +func TestMessageOrdering(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + namespace := "test" + topicName := "ordering-test" + testSchema := CreateTestSchema() + messageCount := 15 + + // Create publisher + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 1, // Single partition to guarantee ordering + PublisherName: "ordering-publisher", + RecordType: testSchema, + } + + publisher, err := suite.CreatePublisher(pubConfig) + require.NoError(t, err) + + // Create subscriber + subConfig := &SubscriberTestConfig{ + Namespace: namespace, + TopicName: topicName, + ConsumerGroup: "ordering-group", + ConsumerInstanceId: "ordering-consumer", + MaxPartitionCount: 1, + SlidingWindowSize: 5, + OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST, + } + + subscriber, err := suite.CreateSubscriber(subConfig) + require.NoError(t, err) + + // Set up message collector + collector := NewMessageCollector(messageCount) + subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + collector.AddMessage(TestMessage{ + ID: fmt.Sprintf("ordered-msg"), + Content: m.Data.Value, + Timestamp: time.Unix(0, m.Data.TsNs), + Key: m.Data.Key, + }) + }) + + // Start subscriber + go func() { + subscriber.Subscribe() + }() + + // Wait for consumer to be ready + time.Sleep(2 * time.Second) + + // Publish messages with same key to ensure they go to same partition + publishTimes := make([]time.Time, messageCount) + for i := 0; i < messageCount; i++ { + publishTimes[i] = time.Now() + + record := schema.RecordBegin(). + SetString("id", fmt.Sprintf("ordered-%d", i)). + SetInt64("timestamp", publishTimes[i].UnixNano()). + SetString("content", fmt.Sprintf("Ordered message %d", i)). + SetInt32("sequence", int32(i)). + RecordEnd() + + key := []byte("same-partition-key") // Same key ensures same partition + err := publisher.PublishRecord(key, record) + require.NoError(t, err) + + // Small delay to ensure different timestamps + time.Sleep(10 * time.Millisecond) + } + + // Wait for all messages + messages := collector.WaitForMessages(30 * time.Second) + require.Len(t, messages, messageCount) + + // Verify ordering within the partition + suite.AssertMessageOrdering(t, messages) +} + +func TestSchemaValidation(t *testing.T) { + suite := NewIntegrationTestSuite(t) + require.NoError(t, suite.Setup()) + + namespace := "test" + topicName := "schema-validation" + + // Test with simple schema + simpleSchema := CreateTestSchema() + + pubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName, + PartitionCount: 1, + PublisherName: "schema-publisher", + RecordType: simpleSchema, + } + + publisher, err := suite.CreatePublisher(pubConfig) + require.NoError(t, err) + + // Test valid record + validRecord := schema.RecordBegin(). + SetString("id", "valid-msg"). + SetInt64("timestamp", time.Now().UnixNano()). + SetString("content", "Valid message"). + SetInt32("sequence", 1). + RecordEnd() + + err = publisher.PublishRecord([]byte("test-key"), validRecord) + assert.NoError(t, err, "Valid record should be published successfully") + + // Test with complex nested schema + complexSchema := CreateComplexTestSchema() + + complexPubConfig := &PublisherTestConfig{ + Namespace: namespace, + TopicName: topicName + "-complex", + PartitionCount: 1, + PublisherName: "complex-publisher", + RecordType: complexSchema, + } + + complexPublisher, err := suite.CreatePublisher(complexPubConfig) + require.NoError(t, err) + + // Test complex nested record + complexRecord := schema.RecordBegin(). + SetString("user_id", "user123"). + SetString("name", "John Doe"). + SetInt32("age", 30). + SetStringList("emails", "john@example.com", "john.doe@company.com"). + SetRecord("address", + schema.RecordBegin(). + SetString("street", "123 Main St"). + SetString("city", "New York"). + SetString("zipcode", "10001"). + RecordEnd()). + SetInt64("created_at", time.Now().UnixNano()). + RecordEnd() + + err = complexPublisher.PublishRecord([]byte("complex-key"), complexRecord) + assert.NoError(t, err, "Complex nested record should be published successfully") +} diff --git a/test/mq/integration/framework.go b/test/mq/integration/framework.go new file mode 100644 index 000000000..421df5d9c --- /dev/null +++ b/test/mq/integration/framework.go @@ -0,0 +1,355 @@ +package integration + +import ( + "context" + "fmt" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/agent" + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// TestEnvironment holds the configuration for the test environment +type TestEnvironment struct { + Masters []string + Brokers []string + Filers []string + TestTimeout time.Duration + CleanupFuncs []func() + mutex sync.Mutex +} + +// IntegrationTestSuite provides the base test framework +type IntegrationTestSuite struct { + env *TestEnvironment + agents map[string]*agent.MessageQueueAgent + publishers map[string]*pub_client.TopicPublisher + subscribers map[string]*sub_client.TopicSubscriber + cleanupOnce sync.Once + t *testing.T +} + +// NewIntegrationTestSuite creates a new test suite instance +func NewIntegrationTestSuite(t *testing.T) *IntegrationTestSuite { + env := &TestEnvironment{ + Masters: getEnvList("SEAWEED_MASTERS", []string{"localhost:19333"}), + Brokers: getEnvList("SEAWEED_BROKERS", []string{"localhost:17777"}), + Filers: getEnvList("SEAWEED_FILERS", []string{"localhost:18888"}), + TestTimeout: getEnvDuration("GO_TEST_TIMEOUT", 30*time.Minute), + } + + return &IntegrationTestSuite{ + env: env, + agents: make(map[string]*agent.MessageQueueAgent), + publishers: make(map[string]*pub_client.TopicPublisher), + subscribers: make(map[string]*sub_client.TopicSubscriber), + t: t, + } +} + +// Setup initializes the test environment +func (its *IntegrationTestSuite) Setup() error { + // Wait for cluster to be ready + if err := its.waitForClusterReady(); err != nil { + return fmt.Errorf("cluster not ready: %v", err) + } + + // Register cleanup + its.t.Cleanup(its.Cleanup) + + return nil +} + +// Cleanup performs cleanup operations +func (its *IntegrationTestSuite) Cleanup() { + its.cleanupOnce.Do(func() { + // Close all subscribers (they use context cancellation) + for name, _ := range its.subscribers { + its.t.Logf("Cleaned up subscriber: %s", name) + } + + // Close all publishers + for name, publisher := range its.publishers { + if publisher != nil { + publisher.Shutdown() + its.t.Logf("Cleaned up publisher: %s", name) + } + } + + // Execute additional cleanup functions + its.env.mutex.Lock() + for _, cleanup := range its.env.CleanupFuncs { + cleanup() + } + its.env.mutex.Unlock() + }) +} + +// CreatePublisher creates a new topic publisher +func (its *IntegrationTestSuite) CreatePublisher(config *PublisherTestConfig) (*pub_client.TopicPublisher, error) { + publisherConfig := &pub_client.PublisherConfiguration{ + Topic: topic.NewTopic(config.Namespace, config.TopicName), + PartitionCount: config.PartitionCount, + Brokers: its.env.Brokers, + PublisherName: config.PublisherName, + RecordType: config.RecordType, + } + + publisher, err := pub_client.NewTopicPublisher(publisherConfig) + if err != nil { + return nil, fmt.Errorf("failed to create publisher: %v", err) + } + + its.publishers[config.PublisherName] = publisher + return publisher, nil +} + +// CreateSubscriber creates a new topic subscriber +func (its *IntegrationTestSuite) CreateSubscriber(config *SubscriberTestConfig) (*sub_client.TopicSubscriber, error) { + subscriberConfig := &sub_client.SubscriberConfiguration{ + ConsumerGroup: config.ConsumerGroup, + ConsumerGroupInstanceId: config.ConsumerInstanceId, + GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + MaxPartitionCount: config.MaxPartitionCount, + SlidingWindowSize: config.SlidingWindowSize, + } + + contentConfig := &sub_client.ContentConfiguration{ + Topic: topic.NewTopic(config.Namespace, config.TopicName), + Filter: config.Filter, + PartitionOffsets: config.PartitionOffsets, + OffsetType: config.OffsetType, + OffsetTsNs: config.OffsetTsNs, + } + + offsetChan := make(chan sub_client.KeyedOffset, 1024) + subscriber := sub_client.NewTopicSubscriber( + context.Background(), + its.env.Brokers, + subscriberConfig, + contentConfig, + offsetChan, + ) + + its.subscribers[config.ConsumerInstanceId] = subscriber + return subscriber, nil +} + +// CreateAgent creates a new message queue agent +func (its *IntegrationTestSuite) CreateAgent(name string) (*agent.MessageQueueAgent, error) { + var brokerAddresses []pb.ServerAddress + for _, broker := range its.env.Brokers { + brokerAddresses = append(brokerAddresses, pb.ServerAddress(broker)) + } + + agentOptions := &agent.MessageQueueAgentOptions{ + SeedBrokers: brokerAddresses, + } + + mqAgent := agent.NewMessageQueueAgent( + agentOptions, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + + its.agents[name] = mqAgent + return mqAgent, nil +} + +// PublisherTestConfig holds configuration for creating test publishers +type PublisherTestConfig struct { + Namespace string + TopicName string + PartitionCount int32 + PublisherName string + RecordType *schema_pb.RecordType +} + +// SubscriberTestConfig holds configuration for creating test subscribers +type SubscriberTestConfig struct { + Namespace string + TopicName string + ConsumerGroup string + ConsumerInstanceId string + MaxPartitionCount int32 + SlidingWindowSize int32 + Filter string + PartitionOffsets []*schema_pb.PartitionOffset + OffsetType schema_pb.OffsetType + OffsetTsNs int64 +} + +// TestMessage represents a test message with metadata +type TestMessage struct { + ID string + Content []byte + Timestamp time.Time + Key []byte +} + +// MessageCollector collects received messages for verification +type MessageCollector struct { + messages []TestMessage + mutex sync.RWMutex + waitCh chan struct{} + expected int +} + +// NewMessageCollector creates a new message collector +func NewMessageCollector(expectedCount int) *MessageCollector { + return &MessageCollector{ + messages: make([]TestMessage, 0), + waitCh: make(chan struct{}), + expected: expectedCount, + } +} + +// AddMessage adds a received message to the collector +func (mc *MessageCollector) AddMessage(msg TestMessage) { + mc.mutex.Lock() + defer mc.mutex.Unlock() + + mc.messages = append(mc.messages, msg) + if len(mc.messages) >= mc.expected { + close(mc.waitCh) + } +} + +// WaitForMessages waits for the expected number of messages or timeout +func (mc *MessageCollector) WaitForMessages(timeout time.Duration) []TestMessage { + select { + case <-mc.waitCh: + case <-time.After(timeout): + } + + mc.mutex.RLock() + defer mc.mutex.RUnlock() + + result := make([]TestMessage, len(mc.messages)) + copy(result, mc.messages) + return result +} + +// GetMessages returns all collected messages +func (mc *MessageCollector) GetMessages() []TestMessage { + mc.mutex.RLock() + defer mc.mutex.RUnlock() + + result := make([]TestMessage, len(mc.messages)) + copy(result, mc.messages) + return result +} + +// CreateTestSchema creates a simple test schema +func CreateTestSchema() *schema_pb.RecordType { + return schema.RecordTypeBegin(). + WithField("id", schema.TypeString). + WithField("timestamp", schema.TypeInt64). + WithField("content", schema.TypeString). + WithField("sequence", schema.TypeInt32). + RecordTypeEnd() +} + +// CreateComplexTestSchema creates a complex test schema with nested structures +func CreateComplexTestSchema() *schema_pb.RecordType { + addressType := schema.RecordTypeBegin(). + WithField("street", schema.TypeString). + WithField("city", schema.TypeString). + WithField("zipcode", schema.TypeString). + RecordTypeEnd() + + return schema.RecordTypeBegin(). + WithField("user_id", schema.TypeString). + WithField("name", schema.TypeString). + WithField("age", schema.TypeInt32). + WithField("emails", schema.ListOf(schema.TypeString)). + WithRecordField("address", addressType). + WithField("created_at", schema.TypeInt64). + RecordTypeEnd() +} + +// Helper functions + +func getEnvList(key string, defaultValue []string) []string { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + return strings.Split(value, ",") +} + +func getEnvDuration(key string, defaultValue time.Duration) time.Duration { + value := os.Getenv(key) + if value == "" { + return defaultValue + } + + duration, err := time.ParseDuration(value) + if err != nil { + return defaultValue + } + return duration +} + +func (its *IntegrationTestSuite) waitForClusterReady() error { + maxRetries := 30 + retryInterval := 2 * time.Second + + for i := 0; i < maxRetries; i++ { + if its.isClusterReady() { + return nil + } + its.t.Logf("Waiting for cluster to be ready... attempt %d/%d", i+1, maxRetries) + time.Sleep(retryInterval) + } + + return fmt.Errorf("cluster not ready after %d attempts", maxRetries) +} + +func (its *IntegrationTestSuite) isClusterReady() bool { + // Check if at least one broker is accessible + for _, broker := range its.env.Brokers { + if its.isBrokerReady(broker) { + return true + } + } + return false +} + +func (its *IntegrationTestSuite) isBrokerReady(broker string) bool { + // Simple connection test + conn, err := grpc.NewClient(broker, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return false + } + defer conn.Close() + + // TODO: Add actual health check call here + return true +} + +// AssertMessagesReceived verifies that expected messages were received +func (its *IntegrationTestSuite) AssertMessagesReceived(t *testing.T, collector *MessageCollector, expectedCount int, timeout time.Duration) { + messages := collector.WaitForMessages(timeout) + require.Len(t, messages, expectedCount, "Expected %d messages, got %d", expectedCount, len(messages)) +} + +// AssertMessageOrdering verifies that messages are received in the expected order +func (its *IntegrationTestSuite) AssertMessageOrdering(t *testing.T, messages []TestMessage) { + for i := 1; i < len(messages); i++ { + require.True(t, messages[i].Timestamp.After(messages[i-1].Timestamp) || messages[i].Timestamp.Equal(messages[i-1].Timestamp), + "Messages not in chronological order: message %d timestamp %v should be >= message %d timestamp %v", + i, messages[i].Timestamp, i-1, messages[i-1].Timestamp) + } +} |
