aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/produce.go
blob: 849d1148d63ffd6175786a2e70270bdec922856d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
package protocol

import (
	"context"
	"encoding/binary"
	"fmt"
	"strings"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/compression"
	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
	"google.golang.org/protobuf/proto"
)

func (h *Handler) handleProduce(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {

	// Version-specific handling
	switch apiVersion {
	case 0, 1:
		return h.handleProduceV0V1(ctx, correlationID, apiVersion, requestBody)
	case 2, 3, 4, 5, 6, 7:
		return h.handleProduceV2Plus(ctx, correlationID, apiVersion, requestBody)
	default:
		return nil, fmt.Errorf("produce version %d not implemented yet", apiVersion)
	}
}

func (h *Handler) handleProduceV0V1(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {
	// Parse Produce v0/v1 request
	// Request format: client_id + acks(2) + timeout(4) + topics_array

	if len(requestBody) < 8 { // client_id_size(2) + acks(2) + timeout(4)
		return nil, fmt.Errorf("Produce request too short")
	}

	// Skip client_id
	clientIDSize := binary.BigEndian.Uint16(requestBody[0:2])

	if len(requestBody) < 2+int(clientIDSize) {
		return nil, fmt.Errorf("Produce request client_id too short")
	}

	_ = string(requestBody[2 : 2+int(clientIDSize)]) // clientID
	offset := 2 + int(clientIDSize)

	if len(requestBody) < offset+10 { // acks(2) + timeout(4) + topics_count(4)
		return nil, fmt.Errorf("Produce request missing data")
	}

	// Parse acks and timeout
	_ = int16(binary.BigEndian.Uint16(requestBody[offset : offset+2])) // acks
	offset += 2

	topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
	offset += 4

	response := make([]byte, 0, 1024)

	// NOTE: Correlation ID is handled by writeResponseWithHeader
	// Do NOT include it in the response body

	// Topics count (same as request)
	topicsCountBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
	response = append(response, topicsCountBytes...)

	// Process each topic
	for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
		if len(requestBody) < offset+2 {
			break
		}

		// Parse topic name
		topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
		offset += 2

		if len(requestBody) < offset+int(topicNameSize)+4 {
			break
		}

		topicName := string(requestBody[offset : offset+int(topicNameSize)])
		offset += int(topicNameSize)

		// Parse partitions count
		partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
		offset += 4

		// Check if topic exists, auto-create if it doesn't (simulates auto.create.topics.enable=true)
		topicExists := h.seaweedMQHandler.TopicExists(topicName)

		_ = h.seaweedMQHandler.ListTopics() // existingTopics
		if !topicExists {
			// Use schema-aware topic creation for auto-created topics with configurable default partitions
			defaultPartitions := h.GetDefaultPartitions()
			glog.V(1).Infof("[PRODUCE] Topic %s does not exist, auto-creating with %d partitions", topicName, defaultPartitions)
			if err := h.createTopicWithSchemaSupport(topicName, defaultPartitions); err != nil {
				glog.V(0).Infof("[PRODUCE] ERROR: Failed to auto-create topic %s: %v", topicName, err)
			} else {
				glog.V(1).Infof("[PRODUCE] Successfully auto-created topic %s", topicName)
				// Invalidate cache immediately after creation so consumers can find it
				h.seaweedMQHandler.InvalidateTopicExistsCache(topicName)
				topicExists = true
			}
		} else {
			glog.V(2).Infof("[PRODUCE] Topic %s already exists", topicName)
		}

		// Response: topic_name_size(2) + topic_name + partitions_array
		response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
		response = append(response, []byte(topicName)...)

		partitionsCountBytes := make([]byte, 4)
		binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount)
		response = append(response, partitionsCountBytes...)

		// Process each partition
		for j := uint32(0); j < partitionsCount && offset < len(requestBody); j++ {
			if len(requestBody) < offset+8 {
				break
			}

			// Parse partition: partition_id(4) + record_set_size(4) + record_set
			partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4])
			offset += 4

			recordSetSize := binary.BigEndian.Uint32(requestBody[offset : offset+4])
			offset += 4

			if len(requestBody) < offset+int(recordSetSize) {
				break
			}

			// CRITICAL FIX: Make a copy of recordSetData to prevent buffer sharing corruption
			// The slice requestBody[offset:offset+int(recordSetSize)] shares the underlying array
			// with the request buffer, which can be reused and cause data corruption
			recordSetData := make([]byte, recordSetSize)
			copy(recordSetData, requestBody[offset:offset+int(recordSetSize)])
			offset += int(recordSetSize)

			// Response: partition_id(4) + error_code(2) + base_offset(8) + log_append_time(8) + log_start_offset(8)
			partitionIDBytes := make([]byte, 4)
			binary.BigEndian.PutUint32(partitionIDBytes, partitionID)
			response = append(response, partitionIDBytes...)

			var errorCode uint16 = 0
			var baseOffset int64 = 0
			currentTime := time.Now().UnixNano()

			if !topicExists {
				errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
			} else {
				// Process the record set
				recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused
				if parseErr != nil {
					errorCode = 42 // INVALID_RECORD
				} else if recordCount > 0 {
					// Use SeaweedMQ integration
					offset, err := h.produceToSeaweedMQ(ctx, topicName, int32(partitionID), recordSetData)
					if err != nil {
						// Check if this is a schema validation error and add delay to prevent overloading
						if h.isSchemaValidationError(err) {
							time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
						}
						errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
					} else {
						baseOffset = offset
					}
				}
			}

			// Error code
			response = append(response, byte(errorCode>>8), byte(errorCode))

			// Base offset (8 bytes)
			baseOffsetBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
			response = append(response, baseOffsetBytes...)

			// Log append time (8 bytes) - timestamp when appended
			logAppendTimeBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(logAppendTimeBytes, uint64(currentTime))
			response = append(response, logAppendTimeBytes...)

			// Log start offset (8 bytes) - same as base for now
			logStartOffsetBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(logStartOffsetBytes, uint64(baseOffset))
			response = append(response, logStartOffsetBytes...)
		}
	}

	// Add throttle time at the end (4 bytes)
	response = append(response, 0, 0, 0, 0)

	// Even for acks=0, kafka-go expects a minimal response structure
	return response, nil
}

// parseRecordSet parses a Kafka record set using the enhanced record batch parser
// Now supports:
// - Proper record batch format parsing (v2)
// - Compression support (gzip, snappy, lz4, zstd)
// - CRC32 validation
// - Individual record extraction
func (h *Handler) parseRecordSet(recordSetData []byte) (recordCount int32, totalSize int32, err error) {

	// Heuristic: permit short inputs for tests
	if len(recordSetData) < 61 {
		// If very small, decide error vs fallback
		if len(recordSetData) < 8 {
			return 0, 0, fmt.Errorf("failed to parse record batch: record set too small: %d bytes", len(recordSetData))
		}
		// If we have at least 20 bytes, attempt to read a count at [16:20]
		if len(recordSetData) >= 20 {
			cnt := int32(binary.BigEndian.Uint32(recordSetData[16:20]))
			if cnt <= 0 || cnt > 1000000 {
				cnt = 1
			}
			return cnt, int32(len(recordSetData)), nil
		}
		// Otherwise default to 1 record
		return 1, int32(len(recordSetData)), nil
	}

	parser := NewRecordBatchParser()

	// Parse the record batch with CRC validation
	batch, err := parser.ParseRecordBatchWithValidation(recordSetData, true)
	if err != nil {
		// If CRC validation fails, try without validation for backward compatibility
		batch, err = parser.ParseRecordBatch(recordSetData)
		if err != nil {
			return 0, 0, fmt.Errorf("failed to parse record batch: %w", err)
		}
	}

	return batch.RecordCount, int32(len(recordSetData)), nil
}

// produceToSeaweedMQ publishes a single record to SeaweedMQ (simplified for Phase 2)
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
func (h *Handler) produceToSeaweedMQ(ctx context.Context, topic string, partition int32, recordSetData []byte) (int64, error) {
	// Extract all records from the record set and publish each one
	// extractAllRecords handles fallback internally for various cases
	records := h.extractAllRecords(recordSetData)

	if len(records) == 0 {
		return 0, fmt.Errorf("failed to parse Kafka record set: no records extracted")
	}

	// Publish all records and return the offset of the first record (base offset)
	var baseOffset int64
	for idx, kv := range records {
		offsetProduced, err := h.produceSchemaBasedRecord(ctx, topic, partition, kv.Key, kv.Value)
		if err != nil {
			return 0, err
		}
		if idx == 0 {
			baseOffset = offsetProduced
		}
	}

	return baseOffset, nil
}

// extractAllRecords parses a Kafka record batch and returns all records' key/value pairs
func (h *Handler) extractAllRecords(recordSetData []byte) []struct{ Key, Value []byte } {
	results := make([]struct{ Key, Value []byte }, 0, 8)

	if len(recordSetData) > 0 {
	}

	if len(recordSetData) < 61 {
		// Too small to be a full batch; treat as single opaque record
		key, value := h.extractFirstRecord(recordSetData)
		// Always include records, even if both key and value are null
		// Schema Registry Noop records may have null values
		results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
		return results
	}

	// Parse record batch header (Kafka v2)
	offset := 0
	_ = int64(binary.BigEndian.Uint64(recordSetData[offset:])) // baseOffset
	offset += 8                                                // base_offset
	_ = binary.BigEndian.Uint32(recordSetData[offset:])        // batchLength
	offset += 4                                                // batch_length
	_ = binary.BigEndian.Uint32(recordSetData[offset:])        // partitionLeaderEpoch
	offset += 4                                                // partition_leader_epoch

	if offset >= len(recordSetData) {
		return results
	}
	magic := recordSetData[offset] // magic
	offset += 1

	if magic != 2 {
		// Unsupported, fallback
		key, value := h.extractFirstRecord(recordSetData)
		// Always include records, even if both key and value are null
		results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
		return results
	}

	// Skip CRC, read attributes to check compression
	offset += 4 // crc
	attributes := binary.BigEndian.Uint16(recordSetData[offset:])
	offset += 2 // attributes

	// Check compression codec from attributes (bits 0-2)
	compressionCodec := compression.CompressionCodec(attributes & 0x07)

	offset += 4 // last_offset_delta
	offset += 8 // first_timestamp
	offset += 8 // max_timestamp
	offset += 8 // producer_id
	offset += 2 // producer_epoch
	offset += 4 // base_sequence

	// records_count
	if offset+4 > len(recordSetData) {
		return results
	}
	recordsCount := int(binary.BigEndian.Uint32(recordSetData[offset:]))
	offset += 4

	// Extract and decompress the records section
	recordsData := recordSetData[offset:]
	if compressionCodec != compression.None {
		decompressed, err := compression.Decompress(compressionCodec, recordsData)
		if err != nil {
			// Fallback to extractFirstRecord
			key, value := h.extractFirstRecord(recordSetData)
			results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
			return results
		}
		recordsData = decompressed
	}
	// Reset offset to start of records data (whether compressed or not)
	offset = 0

	if len(recordsData) > 0 {
	}

	// Iterate records
	for i := 0; i < recordsCount && offset < len(recordsData); i++ {
		// record_length is a SIGNED zigzag-encoded varint (like all varints in Kafka record format)
		recLen, n := decodeVarint(recordsData[offset:])
		if n == 0 || recLen <= 0 {
			break
		}
		offset += n
		if offset+int(recLen) > len(recordsData) {
			break
		}
		rec := recordsData[offset : offset+int(recLen)]
		offset += int(recLen)

		// Parse record fields
		rpos := 0
		if rpos >= len(rec) {
			break
		}
		rpos += 1 // attributes

		// timestamp_delta (varint)
		var nBytes int
		_, nBytes = decodeVarint(rec[rpos:])
		if nBytes == 0 {
			continue
		}
		rpos += nBytes
		// offset_delta (varint)
		_, nBytes = decodeVarint(rec[rpos:])
		if nBytes == 0 {
			continue
		}
		rpos += nBytes

		// key
		keyLen, nBytes := decodeVarint(rec[rpos:])
		if nBytes == 0 {
			continue
		}
		rpos += nBytes
		var key []byte
		if keyLen >= 0 {
			if rpos+int(keyLen) > len(rec) {
				continue
			}
			key = rec[rpos : rpos+int(keyLen)]
			rpos += int(keyLen)
		}

		// value
		valLen, nBytes := decodeVarint(rec[rpos:])
		if nBytes == 0 {
			continue
		}
		rpos += nBytes
		var value []byte
		if valLen >= 0 {
			if rpos+int(valLen) > len(rec) {
				continue
			}
			value = rec[rpos : rpos+int(valLen)]
			rpos += int(valLen)
		}

		// headers (varint) - skip
		_, n = decodeVarint(rec[rpos:])
		if n == 0 { /* ignore */
		}

		// DO NOT normalize nils to empty slices - Kafka distinguishes null vs empty
		// Keep nil as nil, empty as empty

		results = append(results, struct{ Key, Value []byte }{Key: key, Value: value})
	}

	return results
}

// extractFirstRecord extracts the first record from a Kafka record batch
func (h *Handler) extractFirstRecord(recordSetData []byte) ([]byte, []byte) {

	if len(recordSetData) < 61 {
		// Record set too small to contain a valid Kafka v2 batch
		return nil, nil
	}

	offset := 0

	// Parse record batch header (Kafka v2 format)
	// base_offset(8) + batch_length(4) + partition_leader_epoch(4) + magic(1) + crc(4) + attributes(2)
	// + last_offset_delta(4) + first_timestamp(8) + max_timestamp(8) + producer_id(8) + producer_epoch(2)
	// + base_sequence(4) + records_count(4) = 61 bytes header

	offset += 8                                                // skip base_offset
	_ = int32(binary.BigEndian.Uint32(recordSetData[offset:])) // batchLength unused
	offset += 4                                                // batch_length

	offset += 4 // skip partition_leader_epoch
	magic := recordSetData[offset]
	offset += 1 // magic byte

	if magic != 2 {
		// Unsupported magic byte - only Kafka v2 format is supported
		return nil, nil
	}

	offset += 4 // skip crc
	offset += 2 // skip attributes
	offset += 4 // skip last_offset_delta
	offset += 8 // skip first_timestamp
	offset += 8 // skip max_timestamp
	offset += 8 // skip producer_id
	offset += 2 // skip producer_epoch
	offset += 4 // skip base_sequence

	recordsCount := int32(binary.BigEndian.Uint32(recordSetData[offset:]))
	offset += 4 // records_count

	if recordsCount == 0 {
		// No records in batch
		return nil, nil
	}

	// Parse first record
	if offset >= len(recordSetData) {
		// Not enough data to parse record
		return nil, nil
	}

	// Read record length (unsigned varint)
	recordLengthU32, varintLen, err := DecodeUvarint(recordSetData[offset:])
	if err != nil || varintLen == 0 {
		// Invalid varint encoding
		return nil, nil
	}
	recordLength := int64(recordLengthU32)
	offset += varintLen

	if offset+int(recordLength) > len(recordSetData) {
		// Record length exceeds available data
		return nil, nil
	}

	recordData := recordSetData[offset : offset+int(recordLength)]
	recordOffset := 0

	// Parse record: attributes(1) + timestamp_delta(varint) + offset_delta(varint) + key + value + headers
	recordOffset += 1 // skip attributes

	// Skip timestamp_delta (varint)
	_, varintLen = decodeVarint(recordData[recordOffset:])
	if varintLen == 0 {
		// Invalid timestamp_delta varint
		return nil, nil
	}
	recordOffset += varintLen

	// Skip offset_delta (varint)
	_, varintLen = decodeVarint(recordData[recordOffset:])
	if varintLen == 0 {
		// Invalid offset_delta varint
		return nil, nil
	}
	recordOffset += varintLen

	// Read key length and key
	keyLength, varintLen := decodeVarint(recordData[recordOffset:])
	if varintLen == 0 {
		// Invalid key length varint
		return nil, nil
	}
	recordOffset += varintLen

	var key []byte
	if keyLength == -1 {
		key = nil // null key
	} else if keyLength == 0 {
		key = []byte{} // empty key
	} else {
		if recordOffset+int(keyLength) > len(recordData) {
			// Key length exceeds available data
			return nil, nil
		}
		key = recordData[recordOffset : recordOffset+int(keyLength)]
		recordOffset += int(keyLength)
	}

	// Read value length and value
	valueLength, varintLen := decodeVarint(recordData[recordOffset:])
	if varintLen == 0 {
		// Invalid value length varint
		return nil, nil
	}
	recordOffset += varintLen

	var value []byte
	if valueLength == -1 {
		value = nil // null value
	} else if valueLength == 0 {
		value = []byte{} // empty value
	} else {
		if recordOffset+int(valueLength) > len(recordData) {
			// Value length exceeds available data
			return nil, nil
		}
		value = recordData[recordOffset : recordOffset+int(valueLength)]
	}

	// Preserve null semantics - don't convert null to empty
	// Schema Registry Noop records specifically use null values
	return key, value
}

// decodeVarint decodes a variable-length integer from bytes using zigzag encoding
// Returns the decoded value and the number of bytes consumed
func decodeVarint(data []byte) (int64, int) {
	if len(data) == 0 {
		return 0, 0
	}

	var result int64
	var shift uint
	var bytesRead int

	for i, b := range data {
		if i > 9 { // varints can be at most 10 bytes
			return 0, 0 // invalid varint
		}

		bytesRead++
		result |= int64(b&0x7F) << shift

		if (b & 0x80) == 0 {
			// Most significant bit is 0, we're done
			// Apply zigzag decoding for signed integers
			return (result >> 1) ^ (-(result & 1)), bytesRead
		}

		shift += 7
	}

	return 0, 0 // incomplete varint
}

// handleProduceV2Plus handles Produce API v2-v7 (Kafka 0.11+)
func (h *Handler) handleProduceV2Plus(ctx context.Context, correlationID uint32, apiVersion uint16, requestBody []byte) ([]byte, error) {

	// For now, use simplified parsing similar to v0/v1 but handle v2+ response format
	// In v2+, the main differences are:
	// - Request: transactional_id field (nullable string) at the beginning
	// - Response: throttle_time_ms field at the end (v1+)

	// Parse Produce v2+ request format (client_id already stripped in HandleConn)
	// v2: acks(INT16) + timeout_ms(INT32) + topics(ARRAY)
	// v3+: transactional_id(NULLABLE_STRING) + acks(INT16) + timeout_ms(INT32) + topics(ARRAY)

	offset := 0

	// transactional_id only exists in v3+
	if apiVersion >= 3 {
		if len(requestBody) < offset+2 {
			return nil, fmt.Errorf("Produce v%d request too short for transactional_id", apiVersion)
		}
		txIDLen := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
		offset += 2
		if txIDLen >= 0 {
			if len(requestBody) < offset+int(txIDLen) {
				return nil, fmt.Errorf("Produce v%d request transactional_id too short", apiVersion)
			}
			_ = string(requestBody[offset : offset+int(txIDLen)])
			offset += int(txIDLen)
		}
	}

	// Parse acks (INT16) and timeout_ms (INT32)
	if len(requestBody) < offset+6 {
		return nil, fmt.Errorf("Produce v%d request missing acks/timeout", apiVersion)
	}

	acks := int16(binary.BigEndian.Uint16(requestBody[offset : offset+2]))
	offset += 2
	_ = binary.BigEndian.Uint32(requestBody[offset : offset+4])
	offset += 4

	// Remember if this is fire-and-forget mode
	isFireAndForget := acks == 0
	if isFireAndForget {
	} else {
	}

	if len(requestBody) < offset+4 {
		return nil, fmt.Errorf("Produce v%d request missing topics count", apiVersion)
	}
	topicsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
	offset += 4

	// If topicsCount is implausible, there might be a parsing issue
	if topicsCount > 1000 {
		return nil, fmt.Errorf("Produce v%d request has implausible topics count: %d", apiVersion, topicsCount)
	}

	// Build response
	response := make([]byte, 0, 256)

	// NOTE: Correlation ID is handled by writeResponseWithHeader
	// Do NOT include it in the response body

	// Topics array length (first field in response body)
	topicsCountBytes := make([]byte, 4)
	binary.BigEndian.PutUint32(topicsCountBytes, topicsCount)
	response = append(response, topicsCountBytes...)

	// Process each topic with correct parsing and response format
	for i := uint32(0); i < topicsCount && offset < len(requestBody); i++ {
		// Parse topic name
		if len(requestBody) < offset+2 {
			break
		}

		topicNameSize := binary.BigEndian.Uint16(requestBody[offset : offset+2])
		offset += 2

		if len(requestBody) < offset+int(topicNameSize)+4 {
			break
		}

		topicName := string(requestBody[offset : offset+int(topicNameSize)])
		offset += int(topicNameSize)

		// Parse partitions count
		partitionsCount := binary.BigEndian.Uint32(requestBody[offset : offset+4])
		offset += 4

		// Response: topic name (STRING: 2 bytes length + data)
		response = append(response, byte(topicNameSize>>8), byte(topicNameSize))
		response = append(response, []byte(topicName)...)

		// Response: partitions count (4 bytes)
		partitionsCountBytes := make([]byte, 4)
		binary.BigEndian.PutUint32(partitionsCountBytes, partitionsCount)
		response = append(response, partitionsCountBytes...)

		// Process each partition with correct parsing
		for j := uint32(0); j < partitionsCount && offset < len(requestBody); j++ {
			// Parse partition request: partition_id(4) + record_set_size(4) + record_set_data
			if len(requestBody) < offset+8 {
				break
			}
			partitionID := binary.BigEndian.Uint32(requestBody[offset : offset+4])
			offset += 4
			recordSetSize := binary.BigEndian.Uint32(requestBody[offset : offset+4])
			offset += 4
			if len(requestBody) < offset+int(recordSetSize) {
				break
			}
			// CRITICAL FIX: Make a copy of recordSetData to prevent buffer sharing corruption
			// The slice requestBody[offset:offset+int(recordSetSize)] shares the underlying array
			// with the request buffer, which can be reused and cause data corruption
			recordSetData := make([]byte, recordSetSize)
			copy(recordSetData, requestBody[offset:offset+int(recordSetSize)])
			offset += int(recordSetSize)

			// Process the record set and store in ledger
			var errorCode uint16 = 0
			var baseOffset int64 = 0
			currentTime := time.Now().UnixNano()

			// Check if topic exists; for v2+ do NOT auto-create
			topicExists := h.seaweedMQHandler.TopicExists(topicName)

			if !topicExists {
				errorCode = 3 // UNKNOWN_TOPIC_OR_PARTITION
			} else {
				// Process the record set (lenient parsing)
				recordCount, _, parseErr := h.parseRecordSet(recordSetData) // totalSize unused

				if parseErr != nil {
					errorCode = 42 // INVALID_RECORD
				} else if recordCount > 0 {
					// Extract all records from the record set and publish each one
					// extractAllRecords handles fallback internally for various cases
					records := h.extractAllRecords(recordSetData)

					if len(records) == 0 {
						errorCode = 42 // INVALID_RECORD
					} else {
						for idx, kv := range records {
							offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)

							if prodErr != nil {
								// Check if this is a schema validation error and add delay to prevent overloading
								if h.isSchemaValidationError(prodErr) {
									time.Sleep(200 * time.Millisecond) // Brief delay for schema validation failures
								}
								errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
								break
							}

							if idx == 0 {
								baseOffset = offsetProduced
							}
						}
					}
				} else {
					// Try to extract anyway - this might be a Noop record
					records := h.extractAllRecords(recordSetData)
					if len(records) > 0 {
						for idx, kv := range records {
							offsetProduced, prodErr := h.produceSchemaBasedRecord(ctx, topicName, int32(partitionID), kv.Key, kv.Value)
							if prodErr != nil {
								errorCode = 0xFFFF // UNKNOWN_SERVER_ERROR (-1 as uint16)
								break
							}
							if idx == 0 {
								baseOffset = offsetProduced
							}
						}
					}
				}
			}

			// Build correct Produce v2+ response for this partition
			// Format: partition_id(4) + error_code(2) + base_offset(8) + [log_append_time(8) if v>=2] + [log_start_offset(8) if v>=5]

			// partition_id (4 bytes)
			partitionIDBytes := make([]byte, 4)
			binary.BigEndian.PutUint32(partitionIDBytes, partitionID)
			response = append(response, partitionIDBytes...)

			// error_code (2 bytes)
			response = append(response, byte(errorCode>>8), byte(errorCode))

			// base_offset (8 bytes) - offset of first message
			baseOffsetBytes := make([]byte, 8)
			binary.BigEndian.PutUint64(baseOffsetBytes, uint64(baseOffset))
			response = append(response, baseOffsetBytes...)

			// log_append_time (8 bytes) - v2+ field (actual timestamp, not -1)
			if apiVersion >= 2 {
				logAppendTimeBytes := make([]byte, 8)
				binary.BigEndian.PutUint64(logAppendTimeBytes, uint64(currentTime))
				response = append(response, logAppendTimeBytes...)
			}

			// log_start_offset (8 bytes) - v5+ field
			if apiVersion >= 5 {
				logStartOffsetBytes := make([]byte, 8)
				binary.BigEndian.PutUint64(logStartOffsetBytes, uint64(baseOffset))
				response = append(response, logStartOffsetBytes...)
			}
		}
	}

	// For fire-and-forget mode, return empty response after processing
	if isFireAndForget {
		return []byte{}, nil
	}

	// Append throttle_time_ms at the END for v1+ (as per original Kafka protocol)
	if apiVersion >= 1 {
		response = append(response, 0, 0, 0, 0) // throttle_time_ms = 0
	}

	if len(response) < 20 {
	}

	return response, nil
}

// performSchemaValidation performs comprehensive schema validation for a topic
func (h *Handler) performSchemaValidation(topicName string, schemaID uint32, messageFormat schema.Format, messageBytes []byte) error {
	// 1. Check if topic is configured to require schemas
	if !h.isSchematizedTopic(topicName) {
		// Topic doesn't require schemas, but message is schematized - this is allowed
		return nil
	}

	// 2. Get expected schema metadata for the topic
	expectedMetadata, err := h.getSchemaMetadataForTopic(topicName)
	if err != nil {
		// No expected schema found - in strict mode this would be an error
		// In permissive mode, allow any valid schema
		if h.isStrictSchemaValidation() {
			// Add delay before returning schema validation error to prevent overloading
			time.Sleep(100 * time.Millisecond)
			return fmt.Errorf("topic %s requires schema but no expected schema found: %w", topicName, err)
		}
		return nil
	}

	// 3. Validate schema ID matches expected schema
	expectedSchemaID, err := h.parseSchemaID(expectedMetadata["schema_id"])
	if err != nil {
		// Add delay before returning schema validation error to prevent overloading
		time.Sleep(100 * time.Millisecond)
		return fmt.Errorf("invalid expected schema ID for topic %s: %w", topicName, err)
	}

	// 4. Check schema compatibility
	if schemaID != expectedSchemaID {
		// Schema ID doesn't match - check if it's a compatible evolution
		compatible, err := h.checkSchemaEvolution(topicName, expectedSchemaID, schemaID, messageFormat)
		if err != nil {
			// Add delay before returning schema validation error to prevent overloading
			time.Sleep(100 * time.Millisecond)
			return fmt.Errorf("failed to check schema evolution for topic %s: %w", topicName, err)
		}
		if !compatible {
			// Add delay before returning schema validation error to prevent overloading
			time.Sleep(100 * time.Millisecond)
			return fmt.Errorf("schema ID %d is not compatible with expected schema %d for topic %s",
				schemaID, expectedSchemaID, topicName)
		}
	}

	// 5. Validate message format matches expected format
	expectedFormatStr := expectedMetadata["schema_format"]
	var expectedFormat schema.Format
	switch expectedFormatStr {
	case "AVRO":
		expectedFormat = schema.FormatAvro
	case "PROTOBUF":
		expectedFormat = schema.FormatProtobuf
	case "JSON_SCHEMA":
		expectedFormat = schema.FormatJSONSchema
	default:
		expectedFormat = schema.FormatUnknown
	}
	if messageFormat != expectedFormat {
		return fmt.Errorf("message format %s does not match expected format %s for topic %s",
			messageFormat, expectedFormat, topicName)
	}

	// 6. Perform message-level validation
	return h.validateMessageContent(schemaID, messageFormat, messageBytes)
}

// checkSchemaEvolution checks if a schema evolution is compatible
func (h *Handler) checkSchemaEvolution(topicName string, expectedSchemaID, actualSchemaID uint32, format schema.Format) (bool, error) {
	// Get both schemas
	expectedSchema, err := h.schemaManager.GetSchemaByID(expectedSchemaID)
	if err != nil {
		return false, fmt.Errorf("failed to get expected schema %d: %w", expectedSchemaID, err)
	}

	actualSchema, err := h.schemaManager.GetSchemaByID(actualSchemaID)
	if err != nil {
		return false, fmt.Errorf("failed to get actual schema %d: %w", actualSchemaID, err)
	}

	// Since we're accessing schema from registry for this topic, ensure topic config is updated
	h.ensureTopicSchemaFromRegistryCache(topicName, expectedSchema, actualSchema)

	// Check compatibility based on topic's compatibility level
	compatibilityLevel := h.getTopicCompatibilityLevel(topicName)

	result, err := h.schemaManager.CheckSchemaCompatibility(
		expectedSchema.Schema,
		actualSchema.Schema,
		format,
		compatibilityLevel,
	)
	if err != nil {
		return false, fmt.Errorf("failed to check schema compatibility: %w", err)
	}

	return result.Compatible, nil
}

// validateMessageContent validates the message content against its schema
func (h *Handler) validateMessageContent(schemaID uint32, format schema.Format, messageBytes []byte) error {
	// Decode the message to validate it can be parsed correctly
	_, err := h.schemaManager.DecodeMessage(messageBytes)
	if err != nil {
		return fmt.Errorf("message validation failed for schema %d: %w", schemaID, err)
	}

	// Additional format-specific validation could be added here
	switch format {
	case schema.FormatAvro:
		return h.validateAvroMessage(schemaID, messageBytes)
	case schema.FormatProtobuf:
		return h.validateProtobufMessage(schemaID, messageBytes)
	case schema.FormatJSONSchema:
		return h.validateJSONSchemaMessage(schemaID, messageBytes)
	default:
		return fmt.Errorf("unsupported schema format for validation: %s", format)
	}
}

// validateAvroMessage performs Avro-specific validation
func (h *Handler) validateAvroMessage(schemaID uint32, messageBytes []byte) error {
	// Basic validation is already done in DecodeMessage
	// Additional Avro-specific validation could be added here
	return nil
}

// validateProtobufMessage performs Protobuf-specific validation
func (h *Handler) validateProtobufMessage(schemaID uint32, messageBytes []byte) error {
	// Get the schema for additional validation
	cachedSchema, err := h.schemaManager.GetSchemaByID(schemaID)
	if err != nil {
		return fmt.Errorf("failed to get Protobuf schema %d: %w", schemaID, err)
	}

	// Parse the schema to get the descriptor
	parser := schema.NewProtobufDescriptorParser()
	protobufSchema, err := parser.ParseBinaryDescriptor([]byte(cachedSchema.Schema), "")
	if err != nil {
		return fmt.Errorf("failed to parse Protobuf schema: %w", err)
	}

	// Validate message against schema
	envelope, ok := schema.ParseConfluentEnvelope(messageBytes)
	if !ok {
		return fmt.Errorf("invalid Confluent envelope")
	}

	return protobufSchema.ValidateMessage(envelope.Payload)
}

// validateJSONSchemaMessage performs JSON Schema-specific validation
func (h *Handler) validateJSONSchemaMessage(schemaID uint32, messageBytes []byte) error {
	// Get the schema for validation
	cachedSchema, err := h.schemaManager.GetSchemaByID(schemaID)
	if err != nil {
		return fmt.Errorf("failed to get JSON schema %d: %w", schemaID, err)
	}

	// Create JSON Schema decoder for validation
	decoder, err := schema.NewJSONSchemaDecoder(cachedSchema.Schema)
	if err != nil {
		return fmt.Errorf("failed to create JSON Schema decoder: %w", err)
	}

	// Parse envelope and validate payload
	envelope, ok := schema.ParseConfluentEnvelope(messageBytes)
	if !ok {
		return fmt.Errorf("invalid Confluent envelope")
	}

	// Validate JSON payload against schema
	_, err = decoder.Decode(envelope.Payload)
	if err != nil {
		return fmt.Errorf("JSON Schema validation failed: %w", err)
	}

	return nil
}

// Helper methods for configuration

// isSchemaValidationError checks if an error is related to schema validation
func (h *Handler) isSchemaValidationError(err error) bool {
	if err == nil {
		return false
	}
	errStr := strings.ToLower(err.Error())
	return strings.Contains(errStr, "schema") ||
		strings.Contains(errStr, "decode") ||
		strings.Contains(errStr, "validation") ||
		strings.Contains(errStr, "registry") ||
		strings.Contains(errStr, "avro") ||
		strings.Contains(errStr, "protobuf") ||
		strings.Contains(errStr, "json schema")
}

// isStrictSchemaValidation returns whether strict schema validation is enabled
func (h *Handler) isStrictSchemaValidation() bool {
	// This could be configurable per topic or globally
	// For now, default to permissive mode
	return false
}

// getTopicCompatibilityLevel returns the compatibility level for a topic
func (h *Handler) getTopicCompatibilityLevel(topicName string) schema.CompatibilityLevel {
	// This could be configurable per topic
	// For now, default to backward compatibility
	return schema.CompatibilityBackward
}

// parseSchemaID parses a schema ID from string
func (h *Handler) parseSchemaID(schemaIDStr string) (uint32, error) {
	if schemaIDStr == "" {
		return 0, fmt.Errorf("empty schema ID")
	}

	var schemaID uint64
	if _, err := fmt.Sscanf(schemaIDStr, "%d", &schemaID); err != nil {
		return 0, fmt.Errorf("invalid schema ID format: %w", err)
	}

	if schemaID > 0xFFFFFFFF {
		return 0, fmt.Errorf("schema ID too large: %d", schemaID)
	}

	return uint32(schemaID), nil
}

// isSystemTopic checks if a topic should bypass schema processing
func (h *Handler) isSystemTopic(topicName string) bool {
	// System topics that should be stored as-is without schema processing
	systemTopics := []string{
		"_schemas",            // Schema Registry topic
		"__consumer_offsets",  // Kafka consumer offsets topic
		"__transaction_state", // Kafka transaction state topic
	}

	for _, systemTopic := range systemTopics {
		if topicName == systemTopic {
			return true
		}
	}

	// Also check for topics with system prefixes
	return strings.HasPrefix(topicName, "_") || strings.HasPrefix(topicName, "__")
}

// produceSchemaBasedRecord produces a record using schema-based encoding to RecordValue
// ctx controls the publish timeout - if client cancels, produce operation is cancelled
func (h *Handler) produceSchemaBasedRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {

	// System topics should always bypass schema processing and be stored as-is
	if h.isSystemTopic(topic) {
		offset, err := h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
		return offset, err
	}

	// If schema management is not enabled, fall back to raw message handling
	isEnabled := h.IsSchemaEnabled()
	if !isEnabled {
		return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
	}

	var keyDecodedMsg *schema.DecodedMessage
	var valueDecodedMsg *schema.DecodedMessage

	// Check and decode key if schematized
	if key != nil {
		isSchematized := h.schemaManager.IsSchematized(key)
		if isSchematized {
			var err error
			keyDecodedMsg, err = h.schemaManager.DecodeMessage(key)
			if err != nil {
				// Add delay before returning schema decoding error to prevent overloading
				time.Sleep(100 * time.Millisecond)
				return 0, fmt.Errorf("failed to decode schematized key: %w", err)
			}
		}
	}

	// Check and decode value if schematized
	if value != nil && len(value) > 0 {
		isSchematized := h.schemaManager.IsSchematized(value)
		if isSchematized {
			var err error
			valueDecodedMsg, err = h.schemaManager.DecodeMessage(value)
			if err != nil {
				// If message has schema ID (magic byte 0x00), decoding MUST succeed
				// Do not fall back to raw storage - this would corrupt the data model
				time.Sleep(100 * time.Millisecond)
				return 0, fmt.Errorf("message has schema ID but decoding failed (schema registry may be unavailable): %w", err)
			}
		}
	}

	// If neither key nor value is schematized, fall back to raw message handling
	// This is OK for non-schematized messages (no magic byte 0x00)
	if keyDecodedMsg == nil && valueDecodedMsg == nil {
		return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, key, value)
	}

	// Process key schema if present
	if keyDecodedMsg != nil {
		// Store key schema information in memory cache for fetch path performance
		if !h.hasTopicKeySchemaConfig(topic, keyDecodedMsg.SchemaID, keyDecodedMsg.SchemaFormat) {
			err := h.storeTopicKeySchemaConfig(topic, keyDecodedMsg.SchemaID, keyDecodedMsg.SchemaFormat)
			if err != nil {
			}

			// Schedule key schema registration in background (leader-only, non-blocking)
			h.scheduleKeySchemaRegistration(topic, keyDecodedMsg.RecordType)
		}
	}

	// Process value schema if present and create combined RecordValue with key fields
	var recordValueBytes []byte
	if valueDecodedMsg != nil {
		// Create combined RecordValue that includes both key and value fields
		combinedRecordValue := h.createCombinedRecordValue(keyDecodedMsg, valueDecodedMsg)

		// Store the combined RecordValue - schema info is stored in topic configuration
		var err error
		recordValueBytes, err = proto.Marshal(combinedRecordValue)
		if err != nil {
			return 0, fmt.Errorf("failed to marshal combined RecordValue: %w", err)
		}

		// Store value schema information in memory cache for fetch path performance
		// Only store if not already cached to avoid mutex contention on hot path
		hasConfig := h.hasTopicSchemaConfig(topic, valueDecodedMsg.SchemaID, valueDecodedMsg.SchemaFormat)
		if !hasConfig {
			err = h.storeTopicSchemaConfig(topic, valueDecodedMsg.SchemaID, valueDecodedMsg.SchemaFormat)
			if err != nil {
				// Log error but don't fail the produce
			}

			// Schedule value schema registration in background (leader-only, non-blocking)
			h.scheduleSchemaRegistration(topic, valueDecodedMsg.RecordType)
		}
	} else if keyDecodedMsg != nil {
		// If only key is schematized, create RecordValue with just key fields
		combinedRecordValue := h.createCombinedRecordValue(keyDecodedMsg, nil)

		var err error
		recordValueBytes, err = proto.Marshal(combinedRecordValue)
		if err != nil {
			return 0, fmt.Errorf("failed to marshal key-only RecordValue: %w", err)
		}
	} else {
		// If value is not schematized, use raw value
		recordValueBytes = value
	}

	// Prepare final key for storage
	finalKey := key
	if keyDecodedMsg != nil {
		// If key was schematized, convert back to raw bytes for storage
		keyBytes, err := proto.Marshal(keyDecodedMsg.RecordValue)
		if err != nil {
			return 0, fmt.Errorf("failed to marshal key RecordValue: %w", err)
		}
		finalKey = keyBytes
	}

	// Send to SeaweedMQ
	if valueDecodedMsg != nil || keyDecodedMsg != nil {
		// Store the DECODED RecordValue (not the original Confluent Wire Format)
		// This enables SQL queries to work properly. Kafka consumers will receive the RecordValue
		// which can be re-encoded to Confluent Wire Format during fetch if needed
		return h.seaweedMQHandler.ProduceRecordValue(ctx, topic, partition, finalKey, recordValueBytes)
	} else {
		// Send with raw format for non-schematized data
		return h.seaweedMQHandler.ProduceRecord(ctx, topic, partition, finalKey, recordValueBytes)
	}
}

// hasTopicSchemaConfig checks if schema config already exists (read-only, fast path)
func (h *Handler) hasTopicSchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) bool {
	h.topicSchemaConfigMu.RLock()
	defer h.topicSchemaConfigMu.RUnlock()

	if h.topicSchemaConfigs == nil {
		return false
	}

	config, exists := h.topicSchemaConfigs[topic]
	if !exists {
		return false
	}

	// Check if the schema matches (avoid re-registration of same schema)
	return config.ValueSchemaID == schemaID && config.ValueSchemaFormat == schemaFormat
}

// storeTopicSchemaConfig stores original Kafka schema metadata (ID + format) for fetch path
// This is kept in memory for performance when reconstructing Confluent messages during fetch.
// The translated RecordType is persisted via background schema registration.
func (h *Handler) storeTopicSchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) error {
	// Store in memory cache for quick access during fetch operations
	h.topicSchemaConfigMu.Lock()
	defer h.topicSchemaConfigMu.Unlock()

	if h.topicSchemaConfigs == nil {
		h.topicSchemaConfigs = make(map[string]*TopicSchemaConfig)
	}

	config, exists := h.topicSchemaConfigs[topic]
	if !exists {
		config = &TopicSchemaConfig{}
		h.topicSchemaConfigs[topic] = config
	}

	config.ValueSchemaID = schemaID
	config.ValueSchemaFormat = schemaFormat

	return nil
}

// storeTopicKeySchemaConfig stores key schema configuration
func (h *Handler) storeTopicKeySchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) error {
	h.topicSchemaConfigMu.Lock()
	defer h.topicSchemaConfigMu.Unlock()

	if h.topicSchemaConfigs == nil {
		h.topicSchemaConfigs = make(map[string]*TopicSchemaConfig)
	}

	config, exists := h.topicSchemaConfigs[topic]
	if !exists {
		config = &TopicSchemaConfig{}
		h.topicSchemaConfigs[topic] = config
	}

	config.KeySchemaID = schemaID
	config.KeySchemaFormat = schemaFormat
	config.HasKeySchema = true

	return nil
}

// hasTopicKeySchemaConfig checks if key schema config already exists
func (h *Handler) hasTopicKeySchemaConfig(topic string, schemaID uint32, schemaFormat schema.Format) bool {
	h.topicSchemaConfigMu.RLock()
	defer h.topicSchemaConfigMu.RUnlock()

	config, exists := h.topicSchemaConfigs[topic]
	if !exists {
		return false
	}

	// Check if the key schema matches
	return config.HasKeySchema && config.KeySchemaID == schemaID && config.KeySchemaFormat == schemaFormat
}

// scheduleSchemaRegistration registers value schema once per topic-schema combination
func (h *Handler) scheduleSchemaRegistration(topicName string, recordType *schema_pb.RecordType) {
	if recordType == nil {
		return
	}

	// Create a unique key for this value schema registration
	schemaKey := fmt.Sprintf("%s:value:%d", topicName, h.getRecordTypeHash(recordType))

	// Check if already registered
	h.registeredSchemasMu.RLock()
	if h.registeredSchemas[schemaKey] {
		h.registeredSchemasMu.RUnlock()
		return // Already registered
	}
	h.registeredSchemasMu.RUnlock()

	// Double-check with write lock to prevent race condition
	h.registeredSchemasMu.Lock()
	defer h.registeredSchemasMu.Unlock()

	if h.registeredSchemas[schemaKey] {
		return // Already registered by another goroutine
	}

	// Mark as registered before attempting registration
	h.registeredSchemas[schemaKey] = true

	// Perform synchronous registration
	if err := h.registerSchemasViaBrokerAPI(topicName, recordType, nil); err != nil {
		// Remove from registered map on failure so it can be retried
		delete(h.registeredSchemas, schemaKey)
	}
}

// scheduleKeySchemaRegistration registers key schema once per topic-schema combination
func (h *Handler) scheduleKeySchemaRegistration(topicName string, recordType *schema_pb.RecordType) {
	if recordType == nil {
		return
	}

	// Create a unique key for this key schema registration
	schemaKey := fmt.Sprintf("%s:key:%d", topicName, h.getRecordTypeHash(recordType))

	// Check if already registered
	h.registeredSchemasMu.RLock()
	if h.registeredSchemas[schemaKey] {
		h.registeredSchemasMu.RUnlock()
		return // Already registered
	}
	h.registeredSchemasMu.RUnlock()

	// Double-check with write lock to prevent race condition
	h.registeredSchemasMu.Lock()
	defer h.registeredSchemasMu.Unlock()

	if h.registeredSchemas[schemaKey] {
		return // Already registered by another goroutine
	}

	// Mark as registered before attempting registration
	h.registeredSchemas[schemaKey] = true

	// Register key schema to the same topic (not a phantom "-key" topic)
	// This uses the extended ConfigureTopicRequest with separate key/value RecordTypes
	if err := h.registerSchemasViaBrokerAPI(topicName, nil, recordType); err != nil {
		// Remove from registered map on failure so it can be retried
		delete(h.registeredSchemas, schemaKey)
	} else {
	}
}

// ensureTopicSchemaFromRegistryCache ensures topic configuration is updated when schemas are retrieved from registry
func (h *Handler) ensureTopicSchemaFromRegistryCache(topicName string, schemas ...*schema.CachedSchema) {
	if len(schemas) == 0 {
		return
	}

	// Use the latest/most relevant schema (last one in the list)
	latestSchema := schemas[len(schemas)-1]
	if latestSchema == nil {
		return
	}

	// Try to infer RecordType from the cached schema
	recordType, err := h.inferRecordTypeFromCachedSchema(latestSchema)
	if err != nil {
		return
	}

	// Schedule schema registration to update topic.conf
	if recordType != nil {
		h.scheduleSchemaRegistration(topicName, recordType)
	}
}

// ensureTopicKeySchemaFromRegistryCache ensures topic configuration is updated when key schemas are retrieved from registry
func (h *Handler) ensureTopicKeySchemaFromRegistryCache(topicName string, schemas ...*schema.CachedSchema) {
	if len(schemas) == 0 {
		return
	}

	// Use the latest/most relevant schema (last one in the list)
	latestSchema := schemas[len(schemas)-1]
	if latestSchema == nil {
		return
	}

	// Try to infer RecordType from the cached schema
	recordType, err := h.inferRecordTypeFromCachedSchema(latestSchema)
	if err != nil {
		return
	}

	// Schedule key schema registration to update topic.conf
	if recordType != nil {
		h.scheduleKeySchemaRegistration(topicName, recordType)
	}
}

// getRecordTypeHash generates a simple hash for RecordType to use as a key
func (h *Handler) getRecordTypeHash(recordType *schema_pb.RecordType) uint32 {
	if recordType == nil {
		return 0
	}

	// Simple hash based on field count and first field name
	hash := uint32(len(recordType.Fields))
	if len(recordType.Fields) > 0 {
		// Use first field name for additional uniqueness
		firstFieldName := recordType.Fields[0].Name
		for _, char := range firstFieldName {
			hash = hash*31 + uint32(char)
		}
	}

	return hash
}

// createCombinedRecordValue creates a RecordValue that combines fields from both key and value decoded messages
// Key fields are prefixed with "key_" to distinguish them from value fields
// The message key bytes are stored in the _key system column (from logEntry.Key)
func (h *Handler) createCombinedRecordValue(keyDecodedMsg *schema.DecodedMessage, valueDecodedMsg *schema.DecodedMessage) *schema_pb.RecordValue {
	combinedFields := make(map[string]*schema_pb.Value)

	// Add key fields with "key_" prefix
	if keyDecodedMsg != nil && keyDecodedMsg.RecordValue != nil {
		for fieldName, fieldValue := range keyDecodedMsg.RecordValue.Fields {
			combinedFields["key_"+fieldName] = fieldValue
		}
		// Note: The message key bytes are stored in the _key system column (from logEntry.Key)
		// We don't create a "key" field here to avoid redundancy
	}

	// Add value fields (no prefix)
	if valueDecodedMsg != nil && valueDecodedMsg.RecordValue != nil {
		for fieldName, fieldValue := range valueDecodedMsg.RecordValue.Fields {
			combinedFields[fieldName] = fieldValue
		}
	}

	return &schema_pb.RecordValue{
		Fields: combinedFields,
	}
}

// inferRecordTypeFromCachedSchema attempts to infer RecordType from a cached schema
func (h *Handler) inferRecordTypeFromCachedSchema(cachedSchema *schema.CachedSchema) (*schema_pb.RecordType, error) {
	if cachedSchema == nil {
		return nil, fmt.Errorf("cached schema is nil")
	}

	switch cachedSchema.Format {
	case schema.FormatAvro:
		return h.inferRecordTypeFromAvroSchema(cachedSchema.Schema)
	case schema.FormatProtobuf:
		return h.inferRecordTypeFromProtobufSchema(cachedSchema.Schema)
	case schema.FormatJSONSchema:
		return h.inferRecordTypeFromJSONSchema(cachedSchema.Schema)
	default:
		return nil, fmt.Errorf("unsupported schema format for inference: %v", cachedSchema.Format)
	}
}

// inferRecordTypeFromAvroSchema infers RecordType from Avro schema string
// Uses cache to avoid recreating expensive Avro codecs (17% CPU overhead!)
func (h *Handler) inferRecordTypeFromAvroSchema(avroSchema string) (*schema_pb.RecordType, error) {
	// Check cache first
	h.inferredRecordTypesMu.RLock()
	if recordType, exists := h.inferredRecordTypes[avroSchema]; exists {
		h.inferredRecordTypesMu.RUnlock()
		return recordType, nil
	}
	h.inferredRecordTypesMu.RUnlock()

	// Cache miss - create decoder and infer type
	decoder, err := schema.NewAvroDecoder(avroSchema)
	if err != nil {
		return nil, fmt.Errorf("failed to create Avro decoder: %w", err)
	}

	recordType, err := decoder.InferRecordType()
	if err != nil {
		return nil, err
	}

	// Cache the result
	h.inferredRecordTypesMu.Lock()
	h.inferredRecordTypes[avroSchema] = recordType
	h.inferredRecordTypesMu.Unlock()

	return recordType, nil
}

// inferRecordTypeFromProtobufSchema infers RecordType from Protobuf schema
// Uses cache to avoid recreating expensive decoders
func (h *Handler) inferRecordTypeFromProtobufSchema(protobufSchema string) (*schema_pb.RecordType, error) {
	// Check cache first
	cacheKey := "protobuf:" + protobufSchema
	h.inferredRecordTypesMu.RLock()
	if recordType, exists := h.inferredRecordTypes[cacheKey]; exists {
		h.inferredRecordTypesMu.RUnlock()
		return recordType, nil
	}
	h.inferredRecordTypesMu.RUnlock()

	// Cache miss - create decoder and infer type
	decoder, err := schema.NewProtobufDecoder([]byte(protobufSchema))
	if err != nil {
		return nil, fmt.Errorf("failed to create Protobuf decoder: %w", err)
	}

	recordType, err := decoder.InferRecordType()
	if err != nil {
		return nil, err
	}

	// Cache the result
	h.inferredRecordTypesMu.Lock()
	h.inferredRecordTypes[cacheKey] = recordType
	h.inferredRecordTypesMu.Unlock()

	return recordType, nil
}

// inferRecordTypeFromJSONSchema infers RecordType from JSON Schema string
// Uses cache to avoid recreating expensive decoders
func (h *Handler) inferRecordTypeFromJSONSchema(jsonSchema string) (*schema_pb.RecordType, error) {
	// Check cache first
	cacheKey := "json:" + jsonSchema
	h.inferredRecordTypesMu.RLock()
	if recordType, exists := h.inferredRecordTypes[cacheKey]; exists {
		h.inferredRecordTypesMu.RUnlock()
		return recordType, nil
	}
	h.inferredRecordTypesMu.RUnlock()

	// Cache miss - create decoder and infer type
	decoder, err := schema.NewJSONSchemaDecoder(jsonSchema)
	if err != nil {
		return nil, fmt.Errorf("failed to create JSON Schema decoder: %w", err)
	}

	recordType, err := decoder.InferRecordType()
	if err != nil {
		return nil, err
	}

	// Cache the result
	h.inferredRecordTypesMu.Lock()
	h.inferredRecordTypes[cacheKey] = recordType
	h.inferredRecordTypesMu.Unlock()

	return recordType, nil
}