diff options
Diffstat (limited to 'weed/mq/kafka/partition_mapping.go')
| -rw-r--r-- | weed/mq/kafka/partition_mapping.go | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/weed/mq/kafka/partition_mapping.go b/weed/mq/kafka/partition_mapping.go new file mode 100644 index 000000000..697e67386 --- /dev/null +++ b/weed/mq/kafka/partition_mapping.go @@ -0,0 +1,55 @@ +package kafka + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// Convenience functions for partition mapping used by production code +// The full PartitionMapper implementation is in partition_mapping_test.go for testing + +// MapKafkaPartitionToSMQRange maps a Kafka partition to SeaweedMQ ring range +func MapKafkaPartitionToSMQRange(kafkaPartition int32) (rangeStart, rangeStop int32) { + // Use a range size that divides evenly into MaxPartitionCount (2520) + // Range size 35 gives us exactly 72 Kafka partitions: 2520 / 35 = 72 + rangeSize := int32(35) + rangeStart = kafkaPartition * rangeSize + rangeStop = rangeStart + rangeSize - 1 + return rangeStart, rangeStop +} + +// CreateSMQPartition creates a SeaweedMQ partition from a Kafka partition +func CreateSMQPartition(kafkaPartition int32, unixTimeNs int64) *schema_pb.Partition { + rangeStart, rangeStop := MapKafkaPartitionToSMQRange(kafkaPartition) + + return &schema_pb.Partition{ + RingSize: pub_balancer.MaxPartitionCount, + RangeStart: rangeStart, + RangeStop: rangeStop, + UnixTimeNs: unixTimeNs, + } +} + +// ExtractKafkaPartitionFromSMQRange extracts the Kafka partition from SeaweedMQ range +func ExtractKafkaPartitionFromSMQRange(rangeStart int32) int32 { + rangeSize := int32(35) + return rangeStart / rangeSize +} + +// ValidateKafkaPartition validates that a Kafka partition is within supported range +func ValidateKafkaPartition(kafkaPartition int32) bool { + maxPartitions := int32(pub_balancer.MaxPartitionCount) / 35 // 72 partitions + return kafkaPartition >= 0 && kafkaPartition < maxPartitions +} + +// GetRangeSize returns the range size used for partition mapping +func GetRangeSize() int32 { + return 35 +} + +// GetMaxKafkaPartitions returns the maximum number of Kafka partitions supported +func GetMaxKafkaPartitions() int32 { + return int32(pub_balancer.MaxPartitionCount) / 35 // 72 partitions +} + + |
