aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/consumer/static_membership_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/consumer/static_membership_test.go')
-rw-r--r--weed/mq/kafka/consumer/static_membership_test.go196
1 files changed, 196 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/static_membership_test.go b/weed/mq/kafka/consumer/static_membership_test.go
new file mode 100644
index 000000000..df1ad1fbb
--- /dev/null
+++ b/weed/mq/kafka/consumer/static_membership_test.go
@@ -0,0 +1,196 @@
+package consumer
+
+import (
+ "testing"
+ "time"
+)
+
+func TestGroupCoordinator_StaticMembership(t *testing.T) {
+ gc := NewGroupCoordinator()
+ defer gc.Close()
+
+ group := gc.GetOrCreateGroup("test-group")
+
+ // Test static member registration
+ instanceID := "static-instance-1"
+ member := &GroupMember{
+ ID: "member-1",
+ ClientID: "client-1",
+ ClientHost: "localhost",
+ GroupInstanceID: &instanceID,
+ SessionTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+
+ // Add member to group
+ group.Members[member.ID] = member
+ gc.RegisterStaticMember(group, member)
+
+ // Test finding static member
+ foundMember := gc.FindStaticMember(group, instanceID)
+ if foundMember == nil {
+ t.Error("Expected to find static member, got nil")
+ }
+ if foundMember.ID != member.ID {
+ t.Errorf("Expected member ID %s, got %s", member.ID, foundMember.ID)
+ }
+
+ // Test IsStaticMember
+ if !gc.IsStaticMember(member) {
+ t.Error("Expected member to be static")
+ }
+
+ // Test dynamic member (no instance ID)
+ dynamicMember := &GroupMember{
+ ID: "member-2",
+ ClientID: "client-2",
+ ClientHost: "localhost",
+ GroupInstanceID: nil,
+ SessionTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+
+ if gc.IsStaticMember(dynamicMember) {
+ t.Error("Expected member to be dynamic")
+ }
+
+ // Test unregistering static member
+ gc.UnregisterStaticMember(group, instanceID)
+ foundMember = gc.FindStaticMember(group, instanceID)
+ if foundMember != nil {
+ t.Error("Expected static member to be unregistered")
+ }
+}
+
+func TestGroupCoordinator_StaticMemberReconnection(t *testing.T) {
+ gc := NewGroupCoordinator()
+ defer gc.Close()
+
+ group := gc.GetOrCreateGroup("test-group")
+ instanceID := "static-instance-1"
+
+ // First connection
+ member1 := &GroupMember{
+ ID: "member-1",
+ ClientID: "client-1",
+ ClientHost: "localhost",
+ GroupInstanceID: &instanceID,
+ SessionTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+
+ group.Members[member1.ID] = member1
+ gc.RegisterStaticMember(group, member1)
+
+ // Simulate disconnection and reconnection with same instance ID
+ delete(group.Members, member1.ID)
+
+ // Reconnection with same instance ID should reuse the mapping
+ member2 := &GroupMember{
+ ID: "member-2", // Different member ID
+ ClientID: "client-1",
+ ClientHost: "localhost",
+ GroupInstanceID: &instanceID, // Same instance ID
+ SessionTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+
+ group.Members[member2.ID] = member2
+ gc.RegisterStaticMember(group, member2)
+
+ // Should find the new member with the same instance ID
+ foundMember := gc.FindStaticMember(group, instanceID)
+ if foundMember == nil {
+ t.Error("Expected to find static member after reconnection")
+ }
+ if foundMember.ID != member2.ID {
+ t.Errorf("Expected member ID %s, got %s", member2.ID, foundMember.ID)
+ }
+}
+
+func TestGroupCoordinator_StaticMembershipEdgeCases(t *testing.T) {
+ gc := NewGroupCoordinator()
+ defer gc.Close()
+
+ group := gc.GetOrCreateGroup("test-group")
+
+ // Test empty instance ID
+ member := &GroupMember{
+ ID: "member-1",
+ ClientID: "client-1",
+ ClientHost: "localhost",
+ GroupInstanceID: nil,
+ SessionTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+
+ gc.RegisterStaticMember(group, member) // Should be no-op
+ foundMember := gc.FindStaticMember(group, "")
+ if foundMember != nil {
+ t.Error("Expected not to find member with empty instance ID")
+ }
+
+ // Test empty string instance ID
+ emptyInstanceID := ""
+ member.GroupInstanceID = &emptyInstanceID
+ gc.RegisterStaticMember(group, member) // Should be no-op
+ foundMember = gc.FindStaticMember(group, emptyInstanceID)
+ if foundMember != nil {
+ t.Error("Expected not to find member with empty string instance ID")
+ }
+
+ // Test unregistering non-existent instance ID
+ gc.UnregisterStaticMember(group, "non-existent") // Should be no-op
+}
+
+func TestGroupCoordinator_StaticMembershipConcurrency(t *testing.T) {
+ gc := NewGroupCoordinator()
+ defer gc.Close()
+
+ group := gc.GetOrCreateGroup("test-group")
+ instanceID := "static-instance-1"
+
+ // Test concurrent access
+ done := make(chan bool, 2)
+
+ // Goroutine 1: Register static member
+ go func() {
+ member := &GroupMember{
+ ID: "member-1",
+ ClientID: "client-1",
+ ClientHost: "localhost",
+ GroupInstanceID: &instanceID,
+ SessionTimeout: 30000,
+ State: MemberStatePending,
+ LastHeartbeat: time.Now(),
+ JoinedAt: time.Now(),
+ }
+ group.Members[member.ID] = member
+ gc.RegisterStaticMember(group, member)
+ done <- true
+ }()
+
+ // Goroutine 2: Find static member
+ go func() {
+ time.Sleep(10 * time.Millisecond) // Small delay to ensure registration happens first
+ foundMember := gc.FindStaticMember(group, instanceID)
+ if foundMember == nil {
+ t.Error("Expected to find static member in concurrent access")
+ }
+ done <- true
+ }()
+
+ // Wait for both goroutines to complete
+ <-done
+ <-done
+}