aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/record_batch_parser_test.go
blob: d445b94213203e0a927ccabd06172fa45b22f938 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package protocol

import (
	"testing"

	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

// TestRecordBatchParser_ParseRecordBatch tests basic record batch parsing
func TestRecordBatchParser_ParseRecordBatch(t *testing.T) {
	parser := NewRecordBatchParser()

	// Create a minimal valid record batch
	recordData := []byte("test record data")
	batch, err := CreateRecordBatch(100, recordData, compression.None)
	require.NoError(t, err)

	// Parse the batch
	parsed, err := parser.ParseRecordBatch(batch)
	require.NoError(t, err)

	// Verify parsed fields
	assert.Equal(t, int64(100), parsed.BaseOffset)
	assert.Equal(t, int8(2), parsed.Magic)
	assert.Equal(t, int32(1), parsed.RecordCount)
	assert.Equal(t, compression.None, parsed.GetCompressionCodec())
	assert.False(t, parsed.IsCompressed())
}

// TestRecordBatchParser_ParseRecordBatch_TooSmall tests parsing with insufficient data
func TestRecordBatchParser_ParseRecordBatch_TooSmall(t *testing.T) {
	parser := NewRecordBatchParser()

	// Test with data that's too small
	smallData := make([]byte, 30) // Less than 61 bytes minimum
	_, err := parser.ParseRecordBatch(smallData)
	assert.Error(t, err)
	assert.Contains(t, err.Error(), "record batch too small")
}

// TestRecordBatchParser_ParseRecordBatch_InvalidMagic tests parsing with invalid magic byte
func TestRecordBatchParser_ParseRecordBatch_InvalidMagic(t *testing.T) {
	parser := NewRecordBatchParser()

	// Create a batch with invalid magic byte
	recordData := []byte("test record data")
	batch, err := CreateRecordBatch(100, recordData, compression.None)
	require.NoError(t, err)

	// Corrupt the magic byte (at offset 16)
	batch[16] = 1 // Invalid magic byte

	// Parse should fail
	_, err = parser.ParseRecordBatch(batch)
	assert.Error(t, err)
	assert.Contains(t, err.Error(), "unsupported record batch magic byte")
}

// TestRecordBatchParser_Compression tests compression support
func TestRecordBatchParser_Compression(t *testing.T) {
	parser := NewRecordBatchParser()
	recordData := []byte("This is a test record that should compress well when repeated. " +
		"This is a test record that should compress well when repeated. " +
		"This is a test record that should compress well when repeated.")

	codecs := []compression.CompressionCodec{
		compression.None,
		compression.Gzip,
		compression.Snappy,
		compression.Lz4,
		compression.Zstd,
	}

	for _, codec := range codecs {
		t.Run(codec.String(), func(t *testing.T) {
			// Create compressed batch
			batch, err := CreateRecordBatch(200, recordData, codec)
			require.NoError(t, err)

			// Parse the batch
			parsed, err := parser.ParseRecordBatch(batch)
			require.NoError(t, err)

			// Verify compression codec
			assert.Equal(t, codec, parsed.GetCompressionCodec())
			assert.Equal(t, codec != compression.None, parsed.IsCompressed())

			// Decompress and verify data
			decompressed, err := parsed.DecompressRecords()
			require.NoError(t, err)
			assert.Equal(t, recordData, decompressed)
		})
	}
}

// TestRecordBatchParser_CRCValidation tests CRC32 validation
func TestRecordBatchParser_CRCValidation(t *testing.T) {
	parser := NewRecordBatchParser()
	recordData := []byte("test record for CRC validation")

	// Create a valid batch
	batch, err := CreateRecordBatch(300, recordData, compression.None)
	require.NoError(t, err)

	t.Run("Valid CRC", func(t *testing.T) {
		// Parse with CRC validation should succeed
		parsed, err := parser.ParseRecordBatchWithValidation(batch, true)
		require.NoError(t, err)
		assert.Equal(t, int64(300), parsed.BaseOffset)
	})

	t.Run("Invalid CRC", func(t *testing.T) {
		// Corrupt the CRC field
		corruptedBatch := make([]byte, len(batch))
		copy(corruptedBatch, batch)
		corruptedBatch[17] = 0xFF // Corrupt CRC

		// Parse with CRC validation should fail
		_, err := parser.ParseRecordBatchWithValidation(corruptedBatch, true)
		assert.Error(t, err)
		assert.Contains(t, err.Error(), "CRC validation failed")
	})

	t.Run("Skip CRC validation", func(t *testing.T) {
		// Corrupt the CRC field
		corruptedBatch := make([]byte, len(batch))
		copy(corruptedBatch, batch)
		corruptedBatch[17] = 0xFF // Corrupt CRC

		// Parse without CRC validation should succeed
		parsed, err := parser.ParseRecordBatchWithValidation(corruptedBatch, false)
		require.NoError(t, err)
		assert.Equal(t, int64(300), parsed.BaseOffset)
	})
}

// TestRecordBatchParser_ExtractRecords tests record extraction
func TestRecordBatchParser_ExtractRecords(t *testing.T) {
	parser := NewRecordBatchParser()
	recordData := []byte("test record data for extraction")

	// Create a batch
	batch, err := CreateRecordBatch(400, recordData, compression.Gzip)
	require.NoError(t, err)

	// Parse the batch
	parsed, err := parser.ParseRecordBatch(batch)
	require.NoError(t, err)

	// Extract records
	records, err := parsed.ExtractRecords()
	require.NoError(t, err)

	// Verify extracted records (simplified implementation returns 1 record)
	assert.Len(t, records, 1)
	assert.Equal(t, int64(400), records[0].Offset)
	assert.Equal(t, recordData, records[0].Value)
}

// TestCompressRecordBatch tests the compression helper function
func TestCompressRecordBatch(t *testing.T) {
	recordData := []byte("test data for compression")

	t.Run("No compression", func(t *testing.T) {
		compressed, attributes, err := CompressRecordBatch(compression.None, recordData)
		require.NoError(t, err)
		assert.Equal(t, recordData, compressed)
		assert.Equal(t, int16(0), attributes)
	})

	t.Run("Gzip compression", func(t *testing.T) {
		compressed, attributes, err := CompressRecordBatch(compression.Gzip, recordData)
		require.NoError(t, err)
		assert.NotEqual(t, recordData, compressed)
		assert.Equal(t, int16(1), attributes)

		// Verify we can decompress
		decompressed, err := compression.Decompress(compression.Gzip, compressed)
		require.NoError(t, err)
		assert.Equal(t, recordData, decompressed)
	})
}

// TestCreateRecordBatch tests record batch creation
func TestCreateRecordBatch(t *testing.T) {
	recordData := []byte("test record data")
	baseOffset := int64(500)

	t.Run("Uncompressed batch", func(t *testing.T) {
		batch, err := CreateRecordBatch(baseOffset, recordData, compression.None)
		require.NoError(t, err)
		assert.True(t, len(batch) >= 61) // Minimum header size

		// Parse and verify
		parser := NewRecordBatchParser()
		parsed, err := parser.ParseRecordBatch(batch)
		require.NoError(t, err)
		assert.Equal(t, baseOffset, parsed.BaseOffset)
		assert.Equal(t, compression.None, parsed.GetCompressionCodec())
	})

	t.Run("Compressed batch", func(t *testing.T) {
		batch, err := CreateRecordBatch(baseOffset, recordData, compression.Snappy)
		require.NoError(t, err)
		assert.True(t, len(batch) >= 61) // Minimum header size

		// Parse and verify
		parser := NewRecordBatchParser()
		parsed, err := parser.ParseRecordBatch(batch)
		require.NoError(t, err)
		assert.Equal(t, baseOffset, parsed.BaseOffset)
		assert.Equal(t, compression.Snappy, parsed.GetCompressionCodec())
		assert.True(t, parsed.IsCompressed())

		// Verify decompression works
		decompressed, err := parsed.DecompressRecords()
		require.NoError(t, err)
		assert.Equal(t, recordData, decompressed)
	})
}

// TestRecordBatchParser_InvalidRecordCount tests handling of invalid record counts
func TestRecordBatchParser_InvalidRecordCount(t *testing.T) {
	parser := NewRecordBatchParser()

	// Create a valid batch first
	recordData := []byte("test record data")
	batch, err := CreateRecordBatch(100, recordData, compression.None)
	require.NoError(t, err)

	// Corrupt the record count field (at offset 57-60)
	// Set to a very large number
	batch[57] = 0xFF
	batch[58] = 0xFF
	batch[59] = 0xFF
	batch[60] = 0xFF

	// Parse should fail
	_, err = parser.ParseRecordBatch(batch)
	assert.Error(t, err)
	assert.Contains(t, err.Error(), "invalid record count")
}

// BenchmarkRecordBatchParser tests parsing performance
func BenchmarkRecordBatchParser(b *testing.B) {
	parser := NewRecordBatchParser()
	recordData := make([]byte, 1024) // 1KB record
	for i := range recordData {
		recordData[i] = byte(i % 256)
	}

	codecs := []compression.CompressionCodec{
		compression.None,
		compression.Gzip,
		compression.Snappy,
		compression.Lz4,
		compression.Zstd,
	}

	for _, codec := range codecs {
		batch, err := CreateRecordBatch(0, recordData, codec)
		if err != nil {
			b.Fatal(err)
		}

		b.Run("Parse_"+codec.String(), func(b *testing.B) {
			b.ResetTimer()
			for i := 0; i < b.N; i++ {
				_, err := parser.ParseRecordBatch(batch)
				if err != nil {
					b.Fatal(err)
				}
			}
		})

		b.Run("Decompress_"+codec.String(), func(b *testing.B) {
			parsed, err := parser.ParseRecordBatch(batch)
			if err != nil {
				b.Fatal(err)
			}
			b.ResetTimer()
			for i := 0; i < b.N; i++ {
				_, err := parsed.DecompressRecords()
				if err != nil {
					b.Fatal(err)
				}
			}
		})
	}
}