diff options
Diffstat (limited to 'weed/mq/kafka/consumer/static_membership_test.go')
| -rw-r--r-- | weed/mq/kafka/consumer/static_membership_test.go | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/weed/mq/kafka/consumer/static_membership_test.go b/weed/mq/kafka/consumer/static_membership_test.go new file mode 100644 index 000000000..df1ad1fbb --- /dev/null +++ b/weed/mq/kafka/consumer/static_membership_test.go @@ -0,0 +1,196 @@ +package consumer + +import ( + "testing" + "time" +) + +func TestGroupCoordinator_StaticMembership(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + group := gc.GetOrCreateGroup("test-group") + + // Test static member registration + instanceID := "static-instance-1" + member := &GroupMember{ + ID: "member-1", + ClientID: "client-1", + ClientHost: "localhost", + GroupInstanceID: &instanceID, + SessionTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + + // Add member to group + group.Members[member.ID] = member + gc.RegisterStaticMember(group, member) + + // Test finding static member + foundMember := gc.FindStaticMember(group, instanceID) + if foundMember == nil { + t.Error("Expected to find static member, got nil") + } + if foundMember.ID != member.ID { + t.Errorf("Expected member ID %s, got %s", member.ID, foundMember.ID) + } + + // Test IsStaticMember + if !gc.IsStaticMember(member) { + t.Error("Expected member to be static") + } + + // Test dynamic member (no instance ID) + dynamicMember := &GroupMember{ + ID: "member-2", + ClientID: "client-2", + ClientHost: "localhost", + GroupInstanceID: nil, + SessionTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + + if gc.IsStaticMember(dynamicMember) { + t.Error("Expected member to be dynamic") + } + + // Test unregistering static member + gc.UnregisterStaticMember(group, instanceID) + foundMember = gc.FindStaticMember(group, instanceID) + if foundMember != nil { + t.Error("Expected static member to be unregistered") + } +} + +func TestGroupCoordinator_StaticMemberReconnection(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + group := gc.GetOrCreateGroup("test-group") + instanceID := "static-instance-1" + + // First connection + member1 := &GroupMember{ + ID: "member-1", + ClientID: "client-1", + ClientHost: "localhost", + GroupInstanceID: &instanceID, + SessionTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + + group.Members[member1.ID] = member1 + gc.RegisterStaticMember(group, member1) + + // Simulate disconnection and reconnection with same instance ID + delete(group.Members, member1.ID) + + // Reconnection with same instance ID should reuse the mapping + member2 := &GroupMember{ + ID: "member-2", // Different member ID + ClientID: "client-1", + ClientHost: "localhost", + GroupInstanceID: &instanceID, // Same instance ID + SessionTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + + group.Members[member2.ID] = member2 + gc.RegisterStaticMember(group, member2) + + // Should find the new member with the same instance ID + foundMember := gc.FindStaticMember(group, instanceID) + if foundMember == nil { + t.Error("Expected to find static member after reconnection") + } + if foundMember.ID != member2.ID { + t.Errorf("Expected member ID %s, got %s", member2.ID, foundMember.ID) + } +} + +func TestGroupCoordinator_StaticMembershipEdgeCases(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + group := gc.GetOrCreateGroup("test-group") + + // Test empty instance ID + member := &GroupMember{ + ID: "member-1", + ClientID: "client-1", + ClientHost: "localhost", + GroupInstanceID: nil, + SessionTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + + gc.RegisterStaticMember(group, member) // Should be no-op + foundMember := gc.FindStaticMember(group, "") + if foundMember != nil { + t.Error("Expected not to find member with empty instance ID") + } + + // Test empty string instance ID + emptyInstanceID := "" + member.GroupInstanceID = &emptyInstanceID + gc.RegisterStaticMember(group, member) // Should be no-op + foundMember = gc.FindStaticMember(group, emptyInstanceID) + if foundMember != nil { + t.Error("Expected not to find member with empty string instance ID") + } + + // Test unregistering non-existent instance ID + gc.UnregisterStaticMember(group, "non-existent") // Should be no-op +} + +func TestGroupCoordinator_StaticMembershipConcurrency(t *testing.T) { + gc := NewGroupCoordinator() + defer gc.Close() + + group := gc.GetOrCreateGroup("test-group") + instanceID := "static-instance-1" + + // Test concurrent access + done := make(chan bool, 2) + + // Goroutine 1: Register static member + go func() { + member := &GroupMember{ + ID: "member-1", + ClientID: "client-1", + ClientHost: "localhost", + GroupInstanceID: &instanceID, + SessionTimeout: 30000, + State: MemberStatePending, + LastHeartbeat: time.Now(), + JoinedAt: time.Now(), + } + group.Members[member.ID] = member + gc.RegisterStaticMember(group, member) + done <- true + }() + + // Goroutine 2: Find static member + go func() { + time.Sleep(10 * time.Millisecond) // Small delay to ensure registration happens first + foundMember := gc.FindStaticMember(group, instanceID) + if foundMember == nil { + t.Error("Expected to find static member in concurrent access") + } + done <- true + }() + + // Wait for both goroutines to complete + <-done + <-done +} |
