aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/integration/schema_registry_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/integration/schema_registry_test.go')
-rw-r--r--test/kafka/integration/schema_registry_test.go210
1 files changed, 210 insertions, 0 deletions
diff --git a/test/kafka/integration/schema_registry_test.go b/test/kafka/integration/schema_registry_test.go
new file mode 100644
index 000000000..9f6d32849
--- /dev/null
+++ b/test/kafka/integration/schema_registry_test.go
@@ -0,0 +1,210 @@
+package integration
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestSchemaRegistryEventualConsistency reproduces the issue where schemas
+// are registered successfully but are not immediately queryable due to
+// Schema Registry's consumer lag
+func TestSchemaRegistryEventualConsistency(t *testing.T) {
+ // This test requires real SMQ backend
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+ t.Logf("Gateway running on %s", addr)
+
+ // Schema Registry URL from environment or default
+ schemaRegistryURL := "http://localhost:8081"
+
+ // Wait for Schema Registry to be ready
+ if !waitForSchemaRegistry(t, schemaRegistryURL, 30*time.Second) {
+ t.Fatal("Schema Registry not ready")
+ }
+
+ // Define test schemas
+ valueSchema := `{"type":"record","name":"TestMessage","fields":[{"name":"id","type":"string"}]}`
+ keySchema := `{"type":"string"}`
+
+ // Register multiple schemas rapidly (simulates the load test scenario)
+ subjects := []string{
+ "test-topic-0-value",
+ "test-topic-0-key",
+ "test-topic-1-value",
+ "test-topic-1-key",
+ "test-topic-2-value",
+ "test-topic-2-key",
+ "test-topic-3-value",
+ "test-topic-3-key",
+ }
+
+ t.Log("Registering schemas rapidly...")
+ registeredIDs := make(map[string]int)
+ for _, subject := range subjects {
+ schema := valueSchema
+ if strings.HasSuffix(subject, "-key") {
+ schema = keySchema
+ }
+
+ id, err := registerSchema(schemaRegistryURL, subject, schema)
+ if err != nil {
+ t.Fatalf("Failed to register schema for %s: %v", subject, err)
+ }
+ registeredIDs[subject] = id
+ t.Logf("Registered %s with ID %d", subject, id)
+ }
+
+ t.Log("All schemas registered successfully!")
+
+ // Now immediately try to verify them (this reproduces the bug)
+ t.Log("Immediately verifying schemas (without delay)...")
+ immediateFailures := 0
+ for _, subject := range subjects {
+ exists, id, version, err := verifySchema(schemaRegistryURL, subject)
+ if err != nil || !exists {
+ immediateFailures++
+ t.Logf("Immediate verification failed for %s: exists=%v id=%d err=%v", subject, exists, id, err)
+ } else {
+ t.Logf("Immediate verification passed for %s: ID=%d Version=%d", subject, id, version)
+ }
+ }
+
+ if immediateFailures > 0 {
+ t.Logf("BUG REPRODUCED: %d/%d schemas not immediately queryable after registration",
+ immediateFailures, len(subjects))
+ t.Logf(" This is due to Schema Registry's KafkaStoreReaderThread lag")
+ }
+
+ // Now verify with retry logic (this should succeed)
+ t.Log("Verifying schemas with retry logic...")
+ for _, subject := range subjects {
+ expectedID := registeredIDs[subject]
+ if !verifySchemaWithRetry(t, schemaRegistryURL, subject, expectedID, 5*time.Second) {
+ t.Errorf("Failed to verify %s even with retry", subject)
+ }
+ }
+
+ t.Log("✓ All schemas verified successfully with retry logic!")
+}
+
+// registerSchema registers a schema and returns its ID
+func registerSchema(registryURL, subject, schema string) (int, error) {
+ // Escape the schema JSON
+ escapedSchema, err := json.Marshal(schema)
+ if err != nil {
+ return 0, err
+ }
+
+ payload := fmt.Sprintf(`{"schema":%s,"schemaType":"AVRO"}`, escapedSchema)
+
+ resp, err := http.Post(
+ fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject),
+ "application/vnd.schemaregistry.v1+json",
+ strings.NewReader(payload),
+ )
+ if err != nil {
+ return 0, err
+ }
+ defer resp.Body.Close()
+
+ body, _ := io.ReadAll(resp.Body)
+
+ if resp.StatusCode != http.StatusOK {
+ return 0, fmt.Errorf("registration failed: %s - %s", resp.Status, string(body))
+ }
+
+ var result struct {
+ ID int `json:"id"`
+ }
+ if err := json.Unmarshal(body, &result); err != nil {
+ return 0, err
+ }
+
+ return result.ID, nil
+}
+
+// verifySchema checks if a schema exists
+func verifySchema(registryURL, subject string) (exists bool, id int, version int, err error) {
+ resp, err := http.Get(fmt.Sprintf("%s/subjects/%s/versions/latest", registryURL, subject))
+ if err != nil {
+ return false, 0, 0, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusNotFound {
+ return false, 0, 0, nil
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return false, 0, 0, fmt.Errorf("verification failed: %s - %s", resp.Status, string(body))
+ }
+
+ var result struct {
+ ID int `json:"id"`
+ Version int `json:"version"`
+ Schema string `json:"schema"`
+ }
+ body, _ := io.ReadAll(resp.Body)
+ if err := json.Unmarshal(body, &result); err != nil {
+ return false, 0, 0, err
+ }
+
+ return true, result.ID, result.Version, nil
+}
+
+// verifySchemaWithRetry verifies a schema with retry logic
+func verifySchemaWithRetry(t *testing.T, registryURL, subject string, expectedID int, timeout time.Duration) bool {
+ deadline := time.Now().Add(timeout)
+ attempt := 0
+
+ for time.Now().Before(deadline) {
+ attempt++
+ exists, id, version, err := verifySchema(registryURL, subject)
+
+ if err == nil && exists && id == expectedID {
+ if attempt > 1 {
+ t.Logf("✓ %s verified after %d attempts (ID=%d, Version=%d)", subject, attempt, id, version)
+ }
+ return true
+ }
+
+ // Wait before retry (exponential backoff)
+ waitTime := time.Duration(attempt*100) * time.Millisecond
+ if waitTime > 1*time.Second {
+ waitTime = 1 * time.Second
+ }
+ time.Sleep(waitTime)
+ }
+
+ t.Logf("%s verification timed out after %d attempts", subject, attempt)
+ return false
+}
+
+// waitForSchemaRegistry waits for Schema Registry to be ready
+func waitForSchemaRegistry(t *testing.T, url string, timeout time.Duration) bool {
+ deadline := time.Now().Add(timeout)
+
+ for time.Now().Before(deadline) {
+ resp, err := http.Get(url + "/subjects")
+ if err == nil && resp.StatusCode == http.StatusOK {
+ resp.Body.Close()
+ return true
+ }
+ if resp != nil {
+ resp.Body.Close()
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+
+ return false
+}