diff options
Diffstat (limited to 'weed/mq/kafka/schema/envelope_varint_test.go')
| -rw-r--r-- | weed/mq/kafka/schema/envelope_varint_test.go | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/envelope_varint_test.go b/weed/mq/kafka/schema/envelope_varint_test.go new file mode 100644 index 000000000..92004c3d6 --- /dev/null +++ b/weed/mq/kafka/schema/envelope_varint_test.go @@ -0,0 +1,198 @@ +package schema + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEncodeDecodeVarint(t *testing.T) { + testCases := []struct { + name string + value uint64 + }{ + {"zero", 0}, + {"small", 1}, + {"medium", 127}, + {"large", 128}, + {"very_large", 16384}, + {"max_uint32", 4294967295}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Encode the value + encoded := encodeVarint(tc.value) + require.NotEmpty(t, encoded) + + // Decode it back + decoded, bytesRead := readVarint(encoded) + require.Equal(t, len(encoded), bytesRead, "Should consume all encoded bytes") + assert.Equal(t, tc.value, decoded, "Decoded value should match original") + }) + } +} + +func TestCreateConfluentEnvelopeWithProtobufIndexes(t *testing.T) { + testCases := []struct { + name string + format Format + schemaID uint32 + indexes []int + payload []byte + }{ + { + name: "avro_no_indexes", + format: FormatAvro, + schemaID: 123, + indexes: nil, + payload: []byte("avro payload"), + }, + { + name: "protobuf_no_indexes", + format: FormatProtobuf, + schemaID: 456, + indexes: nil, + payload: []byte("protobuf payload"), + }, + { + name: "protobuf_single_index", + format: FormatProtobuf, + schemaID: 789, + indexes: []int{1}, + payload: []byte("protobuf with index"), + }, + { + name: "protobuf_multiple_indexes", + format: FormatProtobuf, + schemaID: 101112, + indexes: []int{0, 1, 2, 3}, + payload: []byte("protobuf with multiple indexes"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create the envelope + envelope := CreateConfluentEnvelope(tc.format, tc.schemaID, tc.indexes, tc.payload) + + // Verify basic structure + require.True(t, len(envelope) >= 5, "Envelope should be at least 5 bytes") + assert.Equal(t, byte(0x00), envelope[0], "Magic byte should be 0x00") + + // Extract and verify schema ID + extractedSchemaID, ok := ExtractSchemaID(envelope) + require.True(t, ok, "Should be able to extract schema ID") + assert.Equal(t, tc.schemaID, extractedSchemaID, "Schema ID should match") + + // Parse the envelope based on format + if tc.format == FormatProtobuf && len(tc.indexes) > 0 { + // Use Protobuf-specific parser with known index count + parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(tc.indexes)) + require.True(t, ok, "Should be able to parse Protobuf envelope") + assert.Equal(t, tc.format, parsed.Format) + assert.Equal(t, tc.schemaID, parsed.SchemaID) + assert.Equal(t, tc.indexes, parsed.Indexes, "Indexes should match") + assert.Equal(t, tc.payload, parsed.Payload, "Payload should match") + } else { + // Use generic parser + parsed, ok := ParseConfluentEnvelope(envelope) + require.True(t, ok, "Should be able to parse envelope") + assert.Equal(t, tc.schemaID, parsed.SchemaID) + + if tc.format == FormatProtobuf && len(tc.indexes) == 0 { + // For Protobuf without indexes, payload should match + assert.Equal(t, tc.payload, parsed.Payload, "Payload should match") + } else if tc.format == FormatAvro { + // For Avro, payload should match (no indexes) + assert.Equal(t, tc.payload, parsed.Payload, "Payload should match") + } + } + }) + } +} + +func TestProtobufEnvelopeRoundTrip(t *testing.T) { + // Use more realistic index values (typically small numbers for message types) + originalIndexes := []int{0, 1, 2, 3} + originalPayload := []byte("test protobuf message data") + schemaID := uint32(12345) + + // Create envelope + envelope := CreateConfluentEnvelope(FormatProtobuf, schemaID, originalIndexes, originalPayload) + + // Parse it back with known index count + parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(originalIndexes)) + require.True(t, ok, "Should be able to parse created envelope") + + // Verify all fields + assert.Equal(t, FormatProtobuf, parsed.Format) + assert.Equal(t, schemaID, parsed.SchemaID) + assert.Equal(t, originalIndexes, parsed.Indexes) + assert.Equal(t, originalPayload, parsed.Payload) + assert.Equal(t, envelope, parsed.OriginalBytes) +} + +func TestVarintEdgeCases(t *testing.T) { + t.Run("empty_data", func(t *testing.T) { + value, bytesRead := readVarint([]byte{}) + assert.Equal(t, uint64(0), value) + assert.Equal(t, 0, bytesRead) + }) + + t.Run("incomplete_varint", func(t *testing.T) { + // Create an incomplete varint (continuation bit set but no more bytes) + incompleteVarint := []byte{0x80} // Continuation bit set, but no more bytes + value, bytesRead := readVarint(incompleteVarint) + assert.Equal(t, uint64(0), value) + assert.Equal(t, 0, bytesRead) + }) + + t.Run("max_varint_length", func(t *testing.T) { + // Create a varint that's too long (more than 10 bytes) + tooLongVarint := make([]byte, 11) + for i := 0; i < 10; i++ { + tooLongVarint[i] = 0x80 // All continuation bits + } + tooLongVarint[10] = 0x01 // Final byte + + value, bytesRead := readVarint(tooLongVarint) + assert.Equal(t, uint64(0), value) + assert.Equal(t, 0, bytesRead) + }) +} + +func TestProtobufEnvelopeValidation(t *testing.T) { + t.Run("valid_envelope", func(t *testing.T) { + indexes := []int{1, 2} + envelope := CreateConfluentEnvelope(FormatProtobuf, 123, indexes, []byte("payload")) + parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes)) + require.True(t, ok) + + err := parsed.Validate() + assert.NoError(t, err) + }) + + t.Run("zero_schema_id", func(t *testing.T) { + indexes := []int{1} + envelope := CreateConfluentEnvelope(FormatProtobuf, 0, indexes, []byte("payload")) + parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes)) + require.True(t, ok) + + err := parsed.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid schema ID: 0") + }) + + t.Run("empty_payload", func(t *testing.T) { + indexes := []int{1} + envelope := CreateConfluentEnvelope(FormatProtobuf, 123, indexes, []byte{}) + parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes)) + require.True(t, ok) + + err := parsed.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "empty payload") + }) +} |
