aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/e2e/offset_management_test.go
blob: 3986478435fce9b2bca35157972e7b5b5380625a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package e2e

import (
	"os"
	"testing"

	"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
)

// TestOffsetManagement tests end-to-end offset management scenarios
// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
func TestOffsetManagement(t *testing.T) {
	gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
	defer gateway.CleanupAndClose()

	addr := gateway.StartAndWait()

	// If schema registry is configured, ensure gateway is in schema mode and log
	if v := os.Getenv("SCHEMA_REGISTRY_URL"); v != "" {
		t.Logf("Schema Registry detected at %s - running offset tests in schematized mode", v)
	}

	// Log which backend we're using
	if gateway.IsSMQMode() {
		t.Logf("Running offset management tests with SMQ backend - offsets will be persisted")
	} else {
		t.Logf("Running offset management tests with mock backend - offsets are in-memory only")
	}

	topic := testutil.GenerateUniqueTopicName("offset-management")
	groupID := testutil.GenerateUniqueGroupID("offset-test-group")

	gateway.AddTestTopic(topic)

	t.Run("BasicOffsetCommitFetch", func(t *testing.T) {
		testBasicOffsetCommitFetch(t, addr, topic, groupID)
	})

	t.Run("ConsumerGroupResumption", func(t *testing.T) {
		testConsumerGroupResumption(t, addr, topic, groupID+"2")
	})
}

func testBasicOffsetCommitFetch(t *testing.T, addr, topic, groupID string) {
	client := testutil.NewKafkaGoClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Produce test messages
	if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
		if id, err := testutil.EnsureValueSchema(t, url, topic); err == nil {
			t.Logf("Ensured value schema id=%d for subject %s-value", id, topic)
		} else {
			t.Logf("Schema registration failed (non-fatal for test): %v", err)
		}
	}
	messages := msgGen.GenerateKafkaGoMessages(5)
	err := client.ProduceMessages(topic, messages)
	testutil.AssertNoError(t, err, "Failed to produce offset test messages")

	// Phase 1: Consume first 3 messages and commit offsets
	t.Logf("=== Phase 1: Consuming first 3 messages ===")
	consumed1, err := client.ConsumeWithGroup(topic, groupID, 3)
	testutil.AssertNoError(t, err, "Failed to consume first batch")
	testutil.AssertEqual(t, 3, len(consumed1), "Should consume exactly 3 messages")

	// Phase 2: Create new consumer with same group ID - should resume from committed offset
	t.Logf("=== Phase 2: Resuming from committed offset ===")
	consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
	testutil.AssertNoError(t, err, "Failed to consume remaining messages")
	testutil.AssertEqual(t, 2, len(consumed2), "Should consume remaining 2 messages")

	// Verify that we got all messages without duplicates
	totalConsumed := len(consumed1) + len(consumed2)
	testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages exactly once")

	t.Logf("SUCCESS: Offset management test completed - consumed %d + %d messages", len(consumed1), len(consumed2))
}

func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
	client := testutil.NewKafkaGoClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Produce messages
	messages := msgGen.GenerateKafkaGoMessages(4)
	err := client.ProduceMessages(topic, messages)
	testutil.AssertNoError(t, err, "Failed to produce messages for resumption test")

	// Consume some messages
	consumed1, err := client.ConsumeWithGroup(topic, groupID, 2)
	testutil.AssertNoError(t, err, "Failed to consume first batch")

	// Simulate consumer restart by consuming remaining messages with same group ID
	consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
	testutil.AssertNoError(t, err, "Failed to consume after restart")

	// Verify total consumption
	totalConsumed := len(consumed1) + len(consumed2)
	testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")

	t.Logf("SUCCESS: Consumer group resumption test completed")
}