aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/internal/testutil/schema_helper.go
blob: 868cc286b787b6fd1e29043bbc633d4571858484 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package testutil

import (
	"testing"

	kschema "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
)

// EnsureValueSchema registers a minimal Avro value schema for the given topic if not present.
// Returns the latest schema ID if successful.
func EnsureValueSchema(t *testing.T, registryURL, topic string) (uint32, error) {
	t.Helper()
	subject := topic + "-value"
	rc := kschema.NewRegistryClient(kschema.RegistryConfig{URL: registryURL})

	// Minimal Avro record schema with string field "value"
	schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}`

	// Try to get existing
	if latest, err := rc.GetLatestSchema(subject); err == nil {
		return latest.LatestID, nil
	}

	// Register and fetch latest
	if _, err := rc.RegisterSchema(subject, schemaJSON); err != nil {
		return 0, err
	}
	latest, err := rc.GetLatestSchema(subject)
	if err != nil {
		return 0, err
	}
	return latest.LatestID, nil
}