aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java')
-rw-r--r--test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java290
1 files changed, 290 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
new file mode 100644
index 000000000..f511b4cf6
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/tools/AdminClientDebugger.java
@@ -0,0 +1,290 @@
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.common.Node;
+
+import java.io.*;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+public class AdminClientDebugger {
+
+ public static void main(String[] args) throws Exception {
+ String broker = args.length > 0 ? args[0] : "localhost:9093";
+
+ System.out.println("=".repeat(80));
+ System.out.println("KAFKA ADMINCLIENT DEBUGGER");
+ System.out.println("=".repeat(80));
+ System.out.println("Target broker: " + broker);
+
+ // Test 1: Raw socket - capture exact bytes
+ System.out.println("\n" + "=".repeat(80));
+ System.out.println("TEST 1: Raw Socket - Capture ApiVersions Exchange");
+ System.out.println("=".repeat(80));
+ testRawSocket(broker);
+
+ // Test 2: AdminClient with detailed logging
+ System.out.println("\n" + "=".repeat(80));
+ System.out.println("TEST 2: AdminClient with Logging");
+ System.out.println("=".repeat(80));
+ testAdminClient(broker);
+ }
+
+ private static void testRawSocket(String broker) {
+ String[] parts = broker.split(":");
+ String host = parts[0];
+ int port = Integer.parseInt(parts[1]);
+
+ try (Socket socket = new Socket(host, port)) {
+ socket.setSoTimeout(10000);
+
+ InputStream in = socket.getInputStream();
+ OutputStream out = socket.getOutputStream();
+
+ System.out.println("Connected to " + broker);
+
+ // Build ApiVersions request (v4)
+ // Format:
+ // [Size][ApiKey=18][ApiVersion=4][CorrelationId=0][ClientId][TaggedFields]
+ ByteArrayOutputStream requestBody = new ByteArrayOutputStream();
+
+ // ApiKey (2 bytes) = 18
+ requestBody.write(0);
+ requestBody.write(18);
+
+ // ApiVersion (2 bytes) = 4
+ requestBody.write(0);
+ requestBody.write(4);
+
+ // CorrelationId (4 bytes) = 0
+ requestBody.write(new byte[] { 0, 0, 0, 0 });
+
+ // ClientId (compact string) = "debug-client"
+ String clientId = "debug-client";
+ writeCompactString(requestBody, clientId);
+
+ // Tagged fields (empty)
+ requestBody.write(0x00);
+
+ byte[] request = requestBody.toByteArray();
+
+ // Write size
+ ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ sizeBuffer.putInt(request.length);
+ out.write(sizeBuffer.array());
+
+ // Write request
+ out.write(request);
+ out.flush();
+
+ System.out.println("\nSENT ApiVersions v4 Request:");
+ System.out.println(" Size: " + request.length + " bytes");
+ hexDump(" Request", request, Math.min(64, request.length));
+
+ // Read response size
+ byte[] sizeBytes = new byte[4];
+ int read = in.read(sizeBytes);
+ if (read != 4) {
+ System.out.println("Failed to read response size (got " + read + " bytes)");
+ return;
+ }
+
+ int responseSize = ByteBuffer.wrap(sizeBytes).getInt();
+ System.out.println("\nRECEIVED Response:");
+ System.out.println(" Size: " + responseSize + " bytes");
+
+ // Read response body
+ byte[] responseBytes = new byte[responseSize];
+ int totalRead = 0;
+ while (totalRead < responseSize) {
+ int n = in.read(responseBytes, totalRead, responseSize - totalRead);
+ if (n == -1) {
+ System.out.println("Unexpected EOF after " + totalRead + " bytes");
+ return;
+ }
+ totalRead += n;
+ }
+
+ System.out.println(" Read complete response: " + totalRead + " bytes");
+
+ // Decode response
+ System.out.println("\nRESPONSE STRUCTURE:");
+ decodeApiVersionsResponse(responseBytes);
+
+ // Try to read more (should timeout or get EOF)
+ System.out.println("\n⏱️ Waiting for any additional data (10s timeout)...");
+ socket.setSoTimeout(10000);
+ try {
+ int nextByte = in.read();
+ if (nextByte == -1) {
+ System.out.println(" Server closed connection (EOF)");
+ } else {
+ System.out.println(" Unexpected data: " + nextByte);
+ }
+ } catch (SocketTimeoutException e) {
+ System.out.println(" Timeout - no additional data");
+ }
+
+ } catch (Exception e) {
+ System.out.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static void testAdminClient(String broker) {
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin-client-debugger");
+ props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
+ props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
+
+ System.out.println("Creating AdminClient with config:");
+ props.forEach((k, v) -> System.out.println(" " + k + " = " + v));
+
+ try (AdminClient adminClient = AdminClient.create(props)) {
+ System.out.println("AdminClient created");
+
+ // Give the thread time to start
+ Thread.sleep(1000);
+
+ System.out.println("\nCalling describeCluster()...");
+ DescribeClusterResult result = adminClient.describeCluster();
+
+ System.out.println(" Waiting for nodes...");
+ Collection<Node> nodes = result.nodes().get();
+
+ System.out.println("Cluster description retrieved:");
+ System.out.println(" Nodes: " + nodes.size());
+ for (Node node : nodes) {
+ System.out.println(" - Node " + node.id() + ": " + node.host() + ":" + node.port());
+ }
+
+ System.out.println("\n Cluster ID: " + result.clusterId().get());
+
+ Node controller = result.controller().get();
+ if (controller != null) {
+ System.out.println(" Controller: Node " + controller.id());
+ }
+
+ } catch (ExecutionException e) {
+ System.out.println("Execution error: " + e.getCause().getMessage());
+ e.getCause().printStackTrace();
+ } catch (Exception e) {
+ System.out.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static void decodeApiVersionsResponse(byte[] data) {
+ int offset = 0;
+
+ try {
+ // Correlation ID (4 bytes)
+ int correlationId = ByteBuffer.wrap(data, offset, 4).getInt();
+ System.out.println(" [Offset " + offset + "] Correlation ID: " + correlationId);
+ offset += 4;
+
+ // Header tagged fields (varint - should be 0x00 for flexible v3+)
+ int taggedFieldsLength = readUnsignedVarint(data, offset);
+ System.out.println(" [Offset " + offset + "] Header Tagged Fields Length: " + taggedFieldsLength);
+ offset += varintSize(data[offset]);
+
+ // Error code (2 bytes)
+ short errorCode = ByteBuffer.wrap(data, offset, 2).getShort();
+ System.out.println(" [Offset " + offset + "] Error Code: " + errorCode);
+ offset += 2;
+
+ // API Keys array (compact array - varint length)
+ int apiKeysLength = readUnsignedVarint(data, offset) - 1; // Compact array: length+1
+ System.out.println(" [Offset " + offset + "] API Keys Count: " + apiKeysLength);
+ offset += varintSize(data[offset]);
+
+ // Show first few API keys
+ System.out.println(" First 5 API Keys:");
+ for (int i = 0; i < Math.min(5, apiKeysLength); i++) {
+ short apiKey = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ short minVersion = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ short maxVersion = ByteBuffer.wrap(data, offset, 2).getShort();
+ offset += 2;
+ // Per-element tagged fields
+ int perElementTagged = readUnsignedVarint(data, offset);
+ offset += varintSize(data[offset]);
+
+ System.out.println(" " + (i + 1) + ". API " + apiKey + ": v" + minVersion + "-v" + maxVersion);
+ }
+
+ System.out.println(" ... (showing first 5 of " + apiKeysLength + " APIs)");
+ System.out.println(" Response structure is valid!");
+
+ // Hex dump of first 64 bytes
+ hexDump("\n First 64 bytes", data, Math.min(64, data.length));
+
+ } catch (Exception e) {
+ System.out.println(" Failed to decode at offset " + offset + ": " + e.getMessage());
+ hexDump(" Raw bytes", data, Math.min(128, data.length));
+ }
+ }
+
+ private static int readUnsignedVarint(byte[] data, int offset) {
+ int value = 0;
+ int shift = 0;
+ while (true) {
+ byte b = data[offset++];
+ value |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0)
+ break;
+ shift += 7;
+ }
+ return value;
+ }
+
+ private static int varintSize(byte firstByte) {
+ int size = 1;
+ byte b = firstByte;
+ while ((b & 0x80) != 0) {
+ size++;
+ b = (byte) (b << 1);
+ }
+ return size;
+ }
+
+ private static void writeCompactString(ByteArrayOutputStream out, String str) {
+ byte[] bytes = str.getBytes();
+ writeUnsignedVarint(out, bytes.length + 1); // Compact string: length+1
+ out.write(bytes, 0, bytes.length);
+ }
+
+ private static void writeUnsignedVarint(ByteArrayOutputStream out, int value) {
+ while ((value & ~0x7F) != 0) {
+ out.write((byte) ((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ out.write((byte) value);
+ }
+
+ private static void hexDump(String label, byte[] data, int length) {
+ System.out.println(label + " (hex dump):");
+ for (int i = 0; i < length; i += 16) {
+ System.out.printf(" %04x ", i);
+ for (int j = 0; j < 16; j++) {
+ if (i + j < length) {
+ System.out.printf("%02x ", data[i + j] & 0xFF);
+ } else {
+ System.out.print(" ");
+ }
+ if (j == 7)
+ System.out.print(" ");
+ }
+ System.out.print(" |");
+ for (int j = 0; j < 16 && i + j < length; j++) {
+ byte b = data[i + j];
+ System.out.print((b >= 32 && b < 127) ? (char) b : '.');
+ }
+ System.out.println("|");
+ }
+ }
+}