aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java
blob: 3c33ae0eae62e0201b5938782cabb1983a4cb5e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package tools;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;

public class SchemaRegistryTest {
    private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
    
    public static void main(String[] args) {
        System.out.println("================================================================================");
        System.out.println("Schema Registry Test - Verifying In-Memory Read Optimization");
        System.out.println("================================================================================\n");
        
        SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 100);
        boolean allTestsPassed = true;
        
        try {
            // Test 1: Register first schema
            System.out.println("Test 1: Registering first schema (user-value)...");
            Schema userValueSchema = SchemaBuilder
                .record("User").fields()
                .requiredString("name")
                .requiredInt("age")
                .endRecord();
            
            long startTime = System.currentTimeMillis();
            int schema1Id = schemaRegistry.register("user-value", userValueSchema);
            long elapsedTime = System.currentTimeMillis() - startTime;
            System.out.println("✓ SUCCESS: Schema registered with ID: " + schema1Id + " (took " + elapsedTime + "ms)");
            
            // Test 2: Register second schema immediately (tests read-after-write)
            System.out.println("\nTest 2: Registering second schema immediately (user-key)...");
            Schema userKeySchema = SchemaBuilder
                .record("UserKey").fields()
                .requiredString("userId")
                .endRecord();
            
            startTime = System.currentTimeMillis();
            int schema2Id = schemaRegistry.register("user-key", userKeySchema);
            elapsedTime = System.currentTimeMillis() - startTime;
            System.out.println("✓ SUCCESS: Schema registered with ID: " + schema2Id + " (took " + elapsedTime + "ms)");
            
            // Test 3: Rapid fire registrations (tests concurrent writes)
            System.out.println("\nTest 3: Rapid fire registrations (10 schemas in parallel)...");
            startTime = System.currentTimeMillis();
            Thread[] threads = new Thread[10];
            final boolean[] results = new boolean[10];
            
            for (int i = 0; i < 10; i++) {
                final int index = i;
                threads[i] = new Thread(() -> {
                    try {
                        Schema schema = SchemaBuilder
                            .record("Test" + index).fields()
                            .requiredString("field" + index)
                            .endRecord();
                        schemaRegistry.register("test-" + index + "-value", schema);
                        results[index] = true;
                    } catch (Exception e) {
                        System.err.println("✗ ERROR in thread " + index + ": " + e.getMessage());
                        results[index] = false;
                    }
                });
                threads[i].start();
            }
            
            for (Thread thread : threads) {
                thread.join();
            }
            
            elapsedTime = System.currentTimeMillis() - startTime;
            int successCount = 0;
            for (boolean result : results) {
                if (result) successCount++;
            }
            
            if (successCount == 10) {
                System.out.println("✓ SUCCESS: All 10 schemas registered (took " + elapsedTime + "ms total, ~" + (elapsedTime / 10) + "ms per schema)");
            } else {
                System.out.println("✗ PARTIAL FAILURE: Only " + successCount + "/10 schemas registered");
                allTestsPassed = false;
            }
            
            // Test 4: Verify we can retrieve all schemas
            System.out.println("\nTest 4: Verifying all schemas are retrievable...");
            startTime = System.currentTimeMillis();
            Schema retrieved1 = schemaRegistry.getById(schema1Id);
            Schema retrieved2 = schemaRegistry.getById(schema2Id);
            elapsedTime = System.currentTimeMillis() - startTime;
            
            if (retrieved1.equals(userValueSchema) && retrieved2.equals(userKeySchema)) {
                System.out.println("✓ SUCCESS: All schemas retrieved correctly (took " + elapsedTime + "ms)");
            } else {
                System.out.println("✗ FAILURE: Schema mismatch");
                allTestsPassed = false;
            }
            
            // Summary
            System.out.println("\n===============================================================================");
            if (allTestsPassed) {
                System.out.println("✓ ALL TESTS PASSED!");
                System.out.println("===============================================================================");
                System.out.println("\nOptimization verified:");
                System.out.println("- ForceFlush is NO LONGER NEEDED");
                System.out.println("- Subscribers read from in-memory buffer using IsOffsetInMemory()");
                System.out.println("- Per-subscriber notification channels provide instant wake-up");
                System.out.println("- True concurrent writes without serialization");
                System.exit(0);
            } else {
                System.out.println("✗ SOME TESTS FAILED");
                System.out.println("===============================================================================");
                System.exit(1);
            }
            
        } catch (Exception e) {
            System.err.println("\n✗ FATAL ERROR: " + e.getMessage());
            e.printStackTrace();
            System.exit(1);
        }
    }
}