aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/tools
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/tools')
-rw-r--r--test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java290
-rw-r--r--test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java72
-rw-r--r--test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java82
-rw-r--r--test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java68
-rw-r--r--test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java124
-rw-r--r--test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java78
-rw-r--r--test/kafka/kafka-client-loadtest/tools/go.mod10
-rw-r--r--test/kafka/kafka-client-loadtest/tools/go.sum24
-rw-r--r--test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go69
-rw-r--r--test/kafka/kafka-client-loadtest/tools/log4j.properties12
-rw-r--r--test/kafka/kafka-client-loadtest/tools/pom.xml72
-rwxr-xr-xtest/kafka/kafka-client-loadtest/tools/simple-testbin0 -> 8617650 bytes
12 files changed, 901 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
new file mode 100644
index 000000000..f511b4cf6
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
@@ -0,0 +1,290 @@
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.common.Node;
+
+import java.io.*;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+public class AdminClientDebugger {
+
+ public static void main(String[] args) throws Exception {
+ String broker = args.length > 0 ? args[0] : "localhost:9093";
+
+ System.out.println("=".repeat(80));
+ System.out.println("KAFKA ADMINCLIENT DEBUGGER");
+ System.out.println("=".repeat(80));
+ System.out.println("Target broker: " + broker);
+
+ // Test 1: Raw socket - capture exact bytes
+ System.out.println("\n" + "=".repeat(80));
+ System.out.println("TEST 1: Raw Socket - Capture ApiVersions Exchange");
+ System.out.println("=".repeat(80));
+ testRawSocket(broker);
+
+ // Test 2: AdminClient with detailed logging
+ System.out.println("\n" + "=".repeat(80));
+ System.out.println("TEST 2: AdminClient with Logging");
+ System.out.println("=".repeat(80));
+ testAdminClient(broker);
+ }
+
+ private static void testRawSocket(String broker) {
+ String[] parts = broker.split(":");
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+
+ try (Socket socket = new Socket(host, port)) {
+ socket.setSoTimeout(10000);
+
+ InputStream in = socket.getInputStream();
+ OutputStream out = socket.getOutputStream();
+
+ System.out.println("Connected to " + broker);
+
+ // Build ApiVersions request (v4)
+ // Format:
+ // [Size][ApiKey=18][ApiVersion=4][CorrelationId=0][ClientId][TaggedFields]
+ ByteArrayOutputStream requestBody = new ByteArrayOutputStream();
+
+ // ApiKey (2 bytes) = 18
+ requestBody.write(0);
+ requestBody.write(18);
+
+ // ApiVersion (2 bytes) = 4
+ requestBody.write(0);
+ requestBody.write(4);
+
+ // CorrelationId (4 bytes) = 0
+ requestBody.write(new byte[] { 0, 0, 0, 0 });
+
+ // ClientId (compact string) = "debug-client"
+ String clientId = "debug-client";
+ writeCompactString(requestBody, clientId);
+
+ // Tagged fields (empty)
+ requestBody.write(0x00);
+
+ byte[] request = requestBody.toByteArray();
+
+ // Write size
+ ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ sizeBuffer.putInt(request.length);
+ out.write(sizeBuffer.array());
+
+ // Write request
+ out.write(request);
+ out.flush();
+
+ System.out.println("\nSENT ApiVersions v4 Request:");
+ System.out.println(" Size: " + request.length + " bytes");
+ hexDump(" Request", request, Math.min(64, request.length));
+
+ // Read response size
+ byte[] sizeBytes = new byte[4];
+ int read = in.read(sizeBytes);
+ if (read != 4) {
+ System.out.println("Failed to read response size (got " + read + " bytes)");
+ return;
+ }
+
+ int responseSize = ByteBuffer.wrap(sizeBytes).getInt();
+ System.out.println("\nRECEIVED Response:");
+ System.out.println(" Size: " + responseSize + " bytes");
+
+ // Read response body
+ byte[] responseBytes = new byte[responseSize];
+ int totalRead = 0;
+ while (totalRead < responseSize) {
+ int n = in.read(responseBytes, totalRead, responseSize - totalRead);
+ if (n == -1) {
+ System.out.println("Unexpected EOF after " + totalRead + " bytes");
+ return;
+ }
+ totalRead += n;
+ }
+
+ System.out.println(" Read complete response: " + totalRead + " bytes");
+
+ // Decode response
+ System.out.println("\nRESPONSE STRUCTURE:");
+ decodeApiVersionsResponse(responseBytes);
+
+ // Try to read more (should timeout or get EOF)
+ System.out.println("\n⏱️ Waiting for any additional data (10s timeout)...");
+ socket.setSoTimeout(10000);
+ try {
+ int nextByte = in.read();
+ if (nextByte == -1) {
+ System.out.println(" Server closed connection (EOF)");
+ } else {
+ System.out.println(" Unexpected data: " + nextByte);
+ }
+ } catch (SocketTimeoutException e) {
+ System.out.println(" Timeout - no additional data");
+ }
+
+ } catch (Exception e) {
+ System.out.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static void testAdminClient(String broker) {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin-client-debugger");
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
+
+ System.out.println("Creating AdminClient with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ System.out.println("AdminClient created");
+
+ // Give the thread time to start
+ Thread.sleep(1000);
+
+ System.out.println("\nCalling describeCluster()...");
+ DescribeClusterResult result = adminClient.describeCluster();
+
+ System.out.println(" Waiting for nodes...");
+ Collection<Node> nodes = result.nodes().get();
+
+ System.out.println("Cluster description retrieved:");
+ System.out.println(" Nodes: " + nodes.size());
+ for (Node node : nodes) {
+ System.out.println(" - Node " + node.id() + ": " + node.host() + ":" + node.port());
+ }
+
+ System.out.println("\n Cluster ID: " + result.clusterId().get());
+
+ Node controller = result.controller().get();
+ if (controller != null) {
+ System.out.println(" Controller: Node " + controller.id());
+ }
+
+ } catch (ExecutionException e) {
+ System.out.println("Execution error: " + e.getCause().getMessage());
+ e.getCause().printStackTrace();
+ } catch (Exception e) {
+ System.out.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static void decodeApiVersionsResponse(byte[] data) {
+ int offset = 0;
+
+ try {
+ // Correlation ID (4 bytes)
+ int correlationId = ByteBuffer.wrap(data, offset, 4).getInt();
+ System.out.println(" [Offset " + offset + "] Correlation ID: " + correlationId);
+ offset += 4;
+
+ // Header tagged fields (varint - should be 0x00 for flexible v3+)
+ int taggedFieldsLength = readUnsignedVarint(data, offset);
+ System.out.println(" [Offset " + offset + "] Header Tagged Fields Length: " + taggedFieldsLength);
+ offset += varintSize(data[offset]);
+
+ // Error code (2 bytes)
+ short errorCode = ByteBuffer.wrap(data, offset, 2).getShort();
+ System.out.println(" [Offset " + offset + "] Error Code: " + errorCode);
+ offset += 2;
+
+ // API Keys array (compact array - varint length)
+ int apiKeysLength = readUnsignedVarint(data, offset) - 1; // Compact array: length+1
+ System.out.println(" [Offset " + offset + "] API Keys Count: " + apiKeysLength);
+ offset += varintSize(data[offset]);
+
+ // Show first few API keys
+ System.out.println(" First 5 API Keys:");
+ for (int i = 0; i < Math.min(5, apiKeysLength); i++) {
+ short apiKey = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ short minVersion = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ short maxVersion = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ // Per-element tagged fields
+ int perElementTagged = readUnsignedVarint(data, offset);
+ offset += varintSize(data[offset]);
+
+ System.out.println(" " + (i + 1) + ". API " + apiKey + ": v" + minVersion + "-v" + maxVersion);
+ }
+
+ System.out.println(" ... (showing first 5 of " + apiKeysLength + " APIs)");
+ System.out.println(" Response structure is valid!");
+
+ // Hex dump of first 64 bytes
+ hexDump("\n First 64 bytes", data, Math.min(64, data.length));
+
+ } catch (Exception e) {
+ System.out.println(" Failed to decode at offset " + offset + ": " + e.getMessage());
+ hexDump(" Raw bytes", data, Math.min(128, data.length));
+ }
+ }
+
+ private static int readUnsignedVarint(byte[] data, int offset) {
+ int value = 0;
+ int shift = 0;
+ while (true) {
+ byte b = data[offset++];
+ value |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0)
+ break;
+ shift += 7;
+ }
+ return value;
+ }
+
+ private static int varintSize(byte firstByte) {
+ int size = 1;
+ byte b = firstByte;
+ while ((b & 0x80) != 0) {
+ size++;
+ b = (byte) (b << 1);
+ }
+ return size;
+ }
+
+ private static void writeCompactString(ByteArrayOutputStream out, String str) {
+ byte[] bytes = str.getBytes();
+ writeUnsignedVarint(out, bytes.length + 1); // Compact string: length+1
+ out.write(bytes, 0, bytes.length);
+ }
+
+ private static void writeUnsignedVarint(ByteArrayOutputStream out, int value) {
+ while ((value & ~0x7F) != 0) {
+ out.write((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ out.write((byte) value);
+ }
+
+ private static void hexDump(String label, byte[] data, int length) {
+ System.out.println(label + " (hex dump):");
+ for (int i = 0; i < length; i += 16) {
+ System.out.printf(" %04x ", i);
+ for (int j = 0; j < 16; j++) {
+ if (i + j < length) {
+ System.out.printf("%02x ", data[i + j] & 0xFF);
+ } else {
+ System.out.print(" ");
+ }
+ if (j == 7)
+ System.out.print(" ");
+ }
+ System.out.print(" |");
+ for (int j = 0; j < 16 && i + j < length; j++) {
+ byte b = data[i + j];
+ System.out.print((b >= 32 && b < 127) ? (char) b : '.');
+ }
+ System.out.println("|");
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java b/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java
new file mode 100644
index 000000000..177a86233
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/JavaAdminClientTest.java
@@ -0,0 +1,72 @@
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class JavaAdminClientTest {
+ public static void main(String[] args) {
+ // Set uncaught exception handler to catch AdminClient thread errors
+ Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+ System.err.println("UNCAUGHT EXCEPTION in thread " + t.getName() + ":");
+ e.printStackTrace();
+ });
+
+ String bootstrapServers = args.length > 0 ? args[0] : "localhost:9093";
+
+ System.out.println("Testing Kafka wire protocol with broker: " + bootstrapServers);
+
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, "java-admin-test");
+ props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000);
+ props.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, 30000);
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
+ props.put(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
+ props.put(AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
+
+ System.out.println("Creating AdminClient with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ System.out.println("AdminClient created successfully");
+ Thread.sleep(2000); // Give it time to initialize
+
+ // Test 1: Describe Cluster (uses Metadata API internally)
+ System.out.println("\n=== Test 1: Describe Cluster ===");
+ try {
+ DescribeClusterResult clusterResult = adminClient.describeCluster();
+ String clusterId = clusterResult.clusterId().get(10, TimeUnit.SECONDS);
+ int nodeCount = clusterResult.nodes().get(10, TimeUnit.SECONDS).size();
+ System.out.println("Cluster ID: " + clusterId);
+ System.out.println("Nodes: " + nodeCount);
+ } catch (Exception e) {
+ System.err.println("Describe Cluster failed: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ // Test 2: List Topics
+ System.out.println("\n=== Test 2: List Topics ===");
+ try {
+ ListTopicsResult topicsResult = adminClient.listTopics();
+ int topicCount = topicsResult.names().get(10, TimeUnit.SECONDS).size();
+ System.out.println("Topics: " + topicCount);
+ } catch (Exception e) {
+ System.err.println("List Topics failed: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ System.out.println("\nAll tests completed!");
+
+ } catch (Exception e) {
+ System.err.println("AdminClient creation failed: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java b/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java
new file mode 100644
index 000000000..41c884544
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/JavaKafkaConsumer.java
@@ -0,0 +1,82 @@
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+public class JavaKafkaConsumer {
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: java JavaKafkaConsumer <broker> <topic>");
+ System.exit(1);
+ }
+
+ String broker = args[0];
+ String topic = args[1];
+
+ System.out.println("Connecting to Kafka broker: " + broker);
+ System.out.println("Topic: " + topic);
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "java-test-group");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
+ props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
+ props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");
+
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singletonList(topic));
+
+ System.out.println("Starting to consume messages...");
+
+ int messageCount = 0;
+ int errorCount = 0;
+ long startTime = System.currentTimeMillis();
+
+ try {
+ while (true) {
+ try {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+ for (ConsumerRecord<String, String> record : records) {
+ messageCount++;
+ System.out.printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s%n",
+ messageCount, record.topic(), record.partition(), record.offset(),
+ record.key(), record.value());
+ }
+
+ // Stop after 100 messages or 60 seconds
+ if (messageCount >= 100 || (System.currentTimeMillis() - startTime) > 60000) {
+ long duration = System.currentTimeMillis() - startTime;
+ System.out.printf("%nSuccessfully consumed %d messages in %dms%n", messageCount, duration);
+ System.out.printf("Success rate: %.1f%% (%d/%d including errors)%n",
+ (double) messageCount / (messageCount + errorCount) * 100, messageCount,
+ messageCount + errorCount);
+ break;
+ }
+ } catch (Exception e) {
+ errorCount++;
+ System.err.printf("Error during poll #%d: %s%n", errorCount, e.getMessage());
+ e.printStackTrace();
+
+ // Stop after 10 consecutive errors or 60 seconds
+ if (errorCount > 10 || (System.currentTimeMillis() - startTime) > 60000) {
+ long duration = System.currentTimeMillis() - startTime;
+ System.err.printf("%nStopping after %d errors in %dms%n", errorCount, duration);
+ break;
+ }
+ }
+ }
+ } finally {
+ consumer.close();
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java b/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java
new file mode 100644
index 000000000..e9898d5f0
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/JavaProducerTest.java
@@ -0,0 +1,68 @@
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+public class JavaProducerTest {
+ public static void main(String[] args) {
+ String bootstrapServers = args.length > 0 ? args[0] : "localhost:9093";
+ String topicName = args.length > 1 ? args[1] : "test-topic";
+
+ System.out.println("Testing Kafka Producer with broker: " + bootstrapServers);
+ System.out.println(" Topic: " + topicName);
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "java-producer-test");
+ props.put(ProducerConfig.ACKS_CONFIG, "1");
+ props.put(ProducerConfig.RETRIES_CONFIG, 0);
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+
+ System.out.println("Creating Producer with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
+ System.out.println("Producer created successfully");
+
+ // Try to send a test message
+ System.out.println("\n=== Test: Send Message ===");
+ try {
+ ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "key1", "value1");
+ System.out.println("Sending record to topic: " + topicName);
+ Future<RecordMetadata> future = producer.send(record);
+
+ RecordMetadata metadata = future.get(); // This will block and wait for response
+ System.out.println("Message sent successfully!");
+ System.out.println(" Topic: " + metadata.topic());
+ System.out.println(" Partition: " + metadata.partition());
+ System.out.println(" Offset: " + metadata.offset());
+ } catch (Exception e) {
+ System.err.println("Send failed: " + e.getMessage());
+ e.printStackTrace();
+
+ // Print cause chain
+ Throwable cause = e.getCause();
+ int depth = 1;
+ while (cause != null && depth < 5) {
+ System.err.println(
+ " Cause " + depth + ": " + cause.getClass().getName() + ": " + cause.getMessage());
+ cause = cause.getCause();
+ depth++;
+ }
+ }
+
+ System.out.println("\nTest completed!");
+
+ } catch (Exception e) {
+ System.err.println("Producer creation or operation failed: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java b/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java
new file mode 100644
index 000000000..3c33ae0ea
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/SchemaRegistryTest.java
@@ -0,0 +1,124 @@
+package tools;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+public class SchemaRegistryTest {
+ private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";
+
+ public static void main(String[] args) {
+ System.out.println("================================================================================");
+ System.out.println("Schema Registry Test - Verifying In-Memory Read Optimization");
+ System.out.println("================================================================================\n");
+
+ SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 100);
+ boolean allTestsPassed = true;
+
+ try {
+ // Test 1: Register first schema
+ System.out.println("Test 1: Registering first schema (user-value)...");
+ Schema userValueSchema = SchemaBuilder
+ .record("User").fields()
+ .requiredString("name")
+ .requiredInt("age")
+ .endRecord();
+
+ long startTime = System.currentTimeMillis();
+ int schema1Id = schemaRegistry.register("user-value", userValueSchema);
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("✓ SUCCESS: Schema registered with ID: " + schema1Id + " (took " + elapsedTime + "ms)");
+
+ // Test 2: Register second schema immediately (tests read-after-write)
+ System.out.println("\nTest 2: Registering second schema immediately (user-key)...");
+ Schema userKeySchema = SchemaBuilder
+ .record("UserKey").fields()
+ .requiredString("userId")
+ .endRecord();
+
+ startTime = System.currentTimeMillis();
+ int schema2Id = schemaRegistry.register("user-key", userKeySchema);
+ elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println("✓ SUCCESS: Schema registered with ID: " + schema2Id + " (took " + elapsedTime + "ms)");
+
+ // Test 3: Rapid fire registrations (tests concurrent writes)
+ System.out.println("\nTest 3: Rapid fire registrations (10 schemas in parallel)...");
+ startTime = System.currentTimeMillis();
+ Thread[] threads = new Thread[10];
+ final boolean[] results = new boolean[10];
+
+ for (int i = 0; i < 10; i++) {
+ final int index = i;
+ threads[i] = new Thread(() -> {
+ try {
+ Schema schema = SchemaBuilder
+ .record("Test" + index).fields()
+ .requiredString("field" + index)
+ .endRecord();
+ schemaRegistry.register("test-" + index + "-value", schema);
+ results[index] = true;
+ } catch (Exception e) {
+ System.err.println("✗ ERROR in thread " + index + ": " + e.getMessage());
+ results[index] = false;
+ }
+ });
+ threads[i].start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ elapsedTime = System.currentTimeMillis() - startTime;
+ int successCount = 0;
+ for (boolean result : results) {
+ if (result) successCount++;
+ }
+
+ if (successCount == 10) {
+ System.out.println("✓ SUCCESS: All 10 schemas registered (took " + elapsedTime + "ms total, ~" + (elapsedTime / 10) + "ms per schema)");
+ } else {
+ System.out.println("✗ PARTIAL FAILURE: Only " + successCount + "/10 schemas registered");
+ allTestsPassed = false;
+ }
+
+ // Test 4: Verify we can retrieve all schemas
+ System.out.println("\nTest 4: Verifying all schemas are retrievable...");
+ startTime = System.currentTimeMillis();
+ Schema retrieved1 = schemaRegistry.getById(schema1Id);
+ Schema retrieved2 = schemaRegistry.getById(schema2Id);
+ elapsedTime = System.currentTimeMillis() - startTime;
+
+ if (retrieved1.equals(userValueSchema) && retrieved2.equals(userKeySchema)) {
+ System.out.println("✓ SUCCESS: All schemas retrieved correctly (took " + elapsedTime + "ms)");
+ } else {
+ System.out.println("✗ FAILURE: Schema mismatch");
+ allTestsPassed = false;
+ }
+
+ // Summary
+ System.out.println("\n===============================================================================");
+ if (allTestsPassed) {
+ System.out.println("✓ ALL TESTS PASSED!");
+ System.out.println("===============================================================================");
+ System.out.println("\nOptimization verified:");
+ System.out.println("- ForceFlush is NO LONGER NEEDED");
+ System.out.println("- Subscribers read from in-memory buffer using IsOffsetInMemory()");
+ System.out.println("- Per-subscriber notification channels provide instant wake-up");
+ System.out.println("- True concurrent writes without serialization");
+ System.exit(0);
+ } else {
+ System.out.println("✗ SOME TESTS FAILED");
+ System.out.println("===============================================================================");
+ System.exit(1);
+ }
+
+ } catch (Exception e) {
+ System.err.println("\n✗ FATAL ERROR: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+}
+
diff --git a/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java b/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java
new file mode 100644
index 000000000..f334c045a
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/TestSocketReadiness.java
@@ -0,0 +1,78 @@
+import java.net.*;
+import java.nio.*;
+import java.nio.channels.*;
+
+public class TestSocketReadiness {
+ public static void main(String[] args) throws Exception {
+ String host = args.length > 0 ? args[0] : "localhost";
+ int port = args.length > 1 ? Integer.parseInt(args[1]) : 9093;
+
+ System.out.println("Testing socket readiness with " + host + ":" + port);
+
+ // Test 1: Simple blocking connect
+ System.out.println("\n=== Test 1: Blocking Socket ===");
+ try (Socket socket = new Socket()) {
+ socket.connect(new InetSocketAddress(host, port), 5000);
+ System.out.println("Blocking socket connected");
+ System.out.println(" Available bytes: " + socket.getInputStream().available());
+ Thread.sleep(100);
+ System.out.println(" Available bytes after 100ms: " + socket.getInputStream().available());
+ } catch (Exception e) {
+ System.err.println("Blocking socket failed: " + e.getMessage());
+ }
+
+ // Test 2: Non-blocking NIO socket (like Kafka client uses)
+ System.out.println("\n=== Test 2: Non-blocking NIO Socket ===");
+ Selector selector = Selector.open();
+ SocketChannel channel = SocketChannel.open();
+ channel.configureBlocking(false);
+
+ try {
+ boolean connected = channel.connect(new InetSocketAddress(host, port));
+ System.out.println(" connect() returned: " + connected);
+
+ SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
+
+ int ready = selector.select(5000);
+ System.out.println(" selector.select() returned: " + ready);
+
+ if (ready > 0) {
+ for (SelectionKey k : selector.selectedKeys()) {
+ if (k.isConnectable()) {
+ System.out.println(" isConnectable: true");
+ boolean finished = channel.finishConnect();
+ System.out.println(" finishConnect() returned: " + finished);
+
+ if (finished) {
+ k.interestOps(SelectionKey.OP_READ);
+
+ // Now check if immediately readable (THIS is what might be wrong)
+ selector.selectedKeys().clear();
+ int readReady = selector.selectNow();
+ System.out.println(" Immediately after connect, selectNow() = " + readReady);
+
+ if (readReady > 0) {
+ System.out.println(" Socket is IMMEDIATELY readable (unexpected!)");
+ ByteBuffer buf = ByteBuffer.allocate(1);
+ int bytesRead = channel.read(buf);
+ System.out.println(" read() returned: " + bytesRead);
+ } else {
+ System.out.println(" Socket is NOT immediately readable (correct)");
+ }
+ }
+ }
+ }
+ }
+
+ System.out.println("NIO socket test completed");
+ } catch (Exception e) {
+ System.err.println("NIO socket failed: " + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ channel.close();
+ selector.close();
+ }
+
+ System.out.println("\nAll tests completed");
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/go.mod b/test/kafka/kafka-client-loadtest/tools/go.mod
new file mode 100644
index 000000000..c63d94230
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/go.mod
@@ -0,0 +1,10 @@
+module simple-test
+
+go 1.24.7
+
+require github.com/segmentio/kafka-go v0.4.49
+
+require (
+ github.com/klauspost/compress v1.15.9 // indirect
+ github.com/pierrec/lz4/v4 v4.1.15 // indirect
+)
diff --git a/test/kafka/kafka-client-loadtest/tools/go.sum b/test/kafka/kafka-client-loadtest/tools/go.sum
new file mode 100644
index 000000000..74b476c2d
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/go.sum
@@ -0,0 +1,24 @@
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/segmentio/kafka-go v0.4.49 h1:GJiNX1d/g+kG6ljyJEoi9++PUMdXGAxb7JGPiDCuNmk=
+github.com/segmentio/kafka-go v0.4.49/go.mod h1:Y1gn60kzLEEaW28YshXyk2+VCUKbJ3Qr6DrnT3i4+9E=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
+github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
+golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
+golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
+golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
new file mode 100644
index 000000000..1da40c89f
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/kafka-go-consumer.go
@@ -0,0 +1,69 @@
+package main
+
+import (
+ "context"
+ "log"
+ "os"
+ "time"
+
+ "github.com/segmentio/kafka-go"
+)
+
+func main() {
+ if len(os.Args) < 3 {
+ log.Fatal("Usage: kafka-go-consumer <broker> <topic>")
+ }
+ broker := os.Args[1]
+ topic := os.Args[2]
+
+ log.Printf("Connecting to Kafka broker: %s", broker)
+ log.Printf("Topic: %s", topic)
+
+ // Create a new reader
+ r := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: []string{broker},
+ Topic: topic,
+ GroupID: "kafka-go-test-group",
+ MinBytes: 1,
+ MaxBytes: 10e6, // 10MB
+ MaxWait: 1 * time.Second,
+ })
+ defer r.Close()
+
+ log.Printf("Starting to consume messages...")
+
+ ctx := context.Background()
+ messageCount := 0
+ errorCount := 0
+ startTime := time.Now()
+
+ for {
+ m, err := r.ReadMessage(ctx)
+ if err != nil {
+ errorCount++
+ log.Printf("Error reading message #%d: %v", messageCount+1, err)
+
+ // Stop after 10 consecutive errors or 60 seconds
+ if errorCount > 10 || time.Since(startTime) > 60*time.Second {
+ log.Printf("\nStopping after %d errors in %v", errorCount, time.Since(startTime))
+ break
+ }
+ continue
+ }
+
+ // Reset error count on successful read
+ errorCount = 0
+ messageCount++
+
+ log.Printf("Message #%d: topic=%s partition=%d offset=%d key=%s value=%s",
+ messageCount, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
+
+ // Stop after 100 messages or 60 seconds
+ if messageCount >= 100 || time.Since(startTime) > 60*time.Second {
+ log.Printf("\nSuccessfully consumed %d messages in %v", messageCount, time.Since(startTime))
+ log.Printf("Success rate: %.1f%% (%d/%d including errors)",
+ float64(messageCount)/float64(messageCount+errorCount)*100, messageCount, messageCount+errorCount)
+ break
+ }
+ }
+}
diff --git a/test/kafka/kafka-client-loadtest/tools/log4j.properties b/test/kafka/kafka-client-loadtest/tools/log4j.properties
new file mode 100644
index 000000000..ed0cd0fe5
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/log4j.properties
@@ -0,0 +1,12 @@
+log4j.rootLogger=DEBUG, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c: %m%n
+
+# More verbose for Kafka client
+log4j.logger.org.apache.kafka=DEBUG
+log4j.logger.org.apache.kafka.clients=TRACE
+log4j.logger.org.apache.kafka.clients.NetworkClient=TRACE
+
+
diff --git a/test/kafka/kafka-client-loadtest/tools/pom.xml b/test/kafka/kafka-client-loadtest/tools/pom.xml
new file mode 100644
index 000000000..58a858e95
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/pom.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.seaweedfs.test</groupId>
+ <artifactId>kafka-consumer-test</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+ <kafka.version>3.9.1</kafka.version>
+ <confluent.version>7.6.0</confluent.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-schema-registry-client</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.11.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>2.0.9</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.11.0</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <mainClass>tools.SchemaRegistryTest</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
+
+
diff --git a/test/kafka/kafka-client-loadtest/tools/simple-test b/test/kafka/kafka-client-loadtest/tools/simple-test
new file mode 100755
index 000000000..47eef7386
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/simple-test
Binary files differ