aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/envelope_varint_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/schema/envelope_varint_test.go')
-rw-r--r--weed/mq/kafka/schema/envelope_varint_test.go198
1 files changed, 198 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/envelope_varint_test.go b/weed/mq/kafka/schema/envelope_varint_test.go
new file mode 100644
index 000000000..92004c3d6
--- /dev/null
+++ b/weed/mq/kafka/schema/envelope_varint_test.go
@@ -0,0 +1,198 @@
+package schema
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestEncodeDecodeVarint(t *testing.T) {
+ testCases := []struct {
+ name string
+ value uint64
+ }{
+ {"zero", 0},
+ {"small", 1},
+ {"medium", 127},
+ {"large", 128},
+ {"very_large", 16384},
+ {"max_uint32", 4294967295},
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Encode the value
+ encoded := encodeVarint(tc.value)
+ require.NotEmpty(t, encoded)
+
+ // Decode it back
+ decoded, bytesRead := readVarint(encoded)
+ require.Equal(t, len(encoded), bytesRead, "Should consume all encoded bytes")
+ assert.Equal(t, tc.value, decoded, "Decoded value should match original")
+ })
+ }
+}
+
+func TestCreateConfluentEnvelopeWithProtobufIndexes(t *testing.T) {
+ testCases := []struct {
+ name string
+ format Format
+ schemaID uint32
+ indexes []int
+ payload []byte
+ }{
+ {
+ name: "avro_no_indexes",
+ format: FormatAvro,
+ schemaID: 123,
+ indexes: nil,
+ payload: []byte("avro payload"),
+ },
+ {
+ name: "protobuf_no_indexes",
+ format: FormatProtobuf,
+ schemaID: 456,
+ indexes: nil,
+ payload: []byte("protobuf payload"),
+ },
+ {
+ name: "protobuf_single_index",
+ format: FormatProtobuf,
+ schemaID: 789,
+ indexes: []int{1},
+ payload: []byte("protobuf with index"),
+ },
+ {
+ name: "protobuf_multiple_indexes",
+ format: FormatProtobuf,
+ schemaID: 101112,
+ indexes: []int{0, 1, 2, 3},
+ payload: []byte("protobuf with multiple indexes"),
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Create the envelope
+ envelope := CreateConfluentEnvelope(tc.format, tc.schemaID, tc.indexes, tc.payload)
+
+ // Verify basic structure
+ require.True(t, len(envelope) >= 5, "Envelope should be at least 5 bytes")
+ assert.Equal(t, byte(0x00), envelope[0], "Magic byte should be 0x00")
+
+ // Extract and verify schema ID
+ extractedSchemaID, ok := ExtractSchemaID(envelope)
+ require.True(t, ok, "Should be able to extract schema ID")
+ assert.Equal(t, tc.schemaID, extractedSchemaID, "Schema ID should match")
+
+ // Parse the envelope based on format
+ if tc.format == FormatProtobuf && len(tc.indexes) > 0 {
+ // Use Protobuf-specific parser with known index count
+ parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(tc.indexes))
+ require.True(t, ok, "Should be able to parse Protobuf envelope")
+ assert.Equal(t, tc.format, parsed.Format)
+ assert.Equal(t, tc.schemaID, parsed.SchemaID)
+ assert.Equal(t, tc.indexes, parsed.Indexes, "Indexes should match")
+ assert.Equal(t, tc.payload, parsed.Payload, "Payload should match")
+ } else {
+ // Use generic parser
+ parsed, ok := ParseConfluentEnvelope(envelope)
+ require.True(t, ok, "Should be able to parse envelope")
+ assert.Equal(t, tc.schemaID, parsed.SchemaID)
+
+ if tc.format == FormatProtobuf && len(tc.indexes) == 0 {
+ // For Protobuf without indexes, payload should match
+ assert.Equal(t, tc.payload, parsed.Payload, "Payload should match")
+ } else if tc.format == FormatAvro {
+ // For Avro, payload should match (no indexes)
+ assert.Equal(t, tc.payload, parsed.Payload, "Payload should match")
+ }
+ }
+ })
+ }
+}
+
+func TestProtobufEnvelopeRoundTrip(t *testing.T) {
+ // Use more realistic index values (typically small numbers for message types)
+ originalIndexes := []int{0, 1, 2, 3}
+ originalPayload := []byte("test protobuf message data")
+ schemaID := uint32(12345)
+
+ // Create envelope
+ envelope := CreateConfluentEnvelope(FormatProtobuf, schemaID, originalIndexes, originalPayload)
+
+ // Parse it back with known index count
+ parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(originalIndexes))
+ require.True(t, ok, "Should be able to parse created envelope")
+
+ // Verify all fields
+ assert.Equal(t, FormatProtobuf, parsed.Format)
+ assert.Equal(t, schemaID, parsed.SchemaID)
+ assert.Equal(t, originalIndexes, parsed.Indexes)
+ assert.Equal(t, originalPayload, parsed.Payload)
+ assert.Equal(t, envelope, parsed.OriginalBytes)
+}
+
+func TestVarintEdgeCases(t *testing.T) {
+ t.Run("empty_data", func(t *testing.T) {
+ value, bytesRead := readVarint([]byte{})
+ assert.Equal(t, uint64(0), value)
+ assert.Equal(t, 0, bytesRead)
+ })
+
+ t.Run("incomplete_varint", func(t *testing.T) {
+ // Create an incomplete varint (continuation bit set but no more bytes)
+ incompleteVarint := []byte{0x80} // Continuation bit set, but no more bytes
+ value, bytesRead := readVarint(incompleteVarint)
+ assert.Equal(t, uint64(0), value)
+ assert.Equal(t, 0, bytesRead)
+ })
+
+ t.Run("max_varint_length", func(t *testing.T) {
+ // Create a varint that's too long (more than 10 bytes)
+ tooLongVarint := make([]byte, 11)
+ for i := 0; i < 10; i++ {
+ tooLongVarint[i] = 0x80 // All continuation bits
+ }
+ tooLongVarint[10] = 0x01 // Final byte
+
+ value, bytesRead := readVarint(tooLongVarint)
+ assert.Equal(t, uint64(0), value)
+ assert.Equal(t, 0, bytesRead)
+ })
+}
+
+func TestProtobufEnvelopeValidation(t *testing.T) {
+ t.Run("valid_envelope", func(t *testing.T) {
+ indexes := []int{1, 2}
+ envelope := CreateConfluentEnvelope(FormatProtobuf, 123, indexes, []byte("payload"))
+ parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes))
+ require.True(t, ok)
+
+ err := parsed.Validate()
+ assert.NoError(t, err)
+ })
+
+ t.Run("zero_schema_id", func(t *testing.T) {
+ indexes := []int{1}
+ envelope := CreateConfluentEnvelope(FormatProtobuf, 0, indexes, []byte("payload"))
+ parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes))
+ require.True(t, ok)
+
+ err := parsed.Validate()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "invalid schema ID: 0")
+ })
+
+ t.Run("empty_payload", func(t *testing.T) {
+ indexes := []int{1}
+ envelope := CreateConfluentEnvelope(FormatProtobuf, 123, indexes, []byte{})
+ parsed, ok := ParseConfluentProtobufEnvelopeWithIndexCount(envelope, len(indexes))
+ require.True(t, ok)
+
+ err := parsed.Validate()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "empty payload")
+ })
+}