diff options
Diffstat (limited to 'weed/mq/kafka/integration/broker_client_publish.go')
| -rw-r--r-- | weed/mq/kafka/integration/broker_client_publish.go | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_client_publish.go b/weed/mq/kafka/integration/broker_client_publish.go new file mode 100644 index 000000000..4feda2973 --- /dev/null +++ b/weed/mq/kafka/integration/broker_client_publish.go @@ -0,0 +1,275 @@ +package integration + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// PublishRecord publishes a single record to SeaweedMQ broker +func (bc *BrokerClient) PublishRecord(topic string, partition int32, key []byte, value []byte, timestamp int64) (int64, error) { + + session, err := bc.getOrCreatePublisher(topic, partition) + if err != nil { + return 0, err + } + + if session.Stream == nil { + return 0, fmt.Errorf("publisher session stream cannot be nil") + } + + // CRITICAL: Lock to prevent concurrent Send/Recv causing response mix-ups + // Without this, two concurrent publishes can steal each other's offsets + session.mu.Lock() + defer session.mu.Unlock() + + // Send data message using broker API format + dataMsg := &mq_pb.DataMessage{ + Key: key, + Value: value, + TsNs: timestamp, + } + + if len(dataMsg.Value) > 0 { + } else { + } + if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: dataMsg, + }, + }); err != nil { + return 0, fmt.Errorf("failed to send data: %v", err) + } + + // Read acknowledgment + resp, err := session.Stream.Recv() + if err != nil { + return 0, fmt.Errorf("failed to receive ack: %v", err) + } + + if topic == "_schemas" { + glog.Infof("[GATEWAY RECV] topic=%s partition=%d resp.AssignedOffset=%d resp.AckTsNs=%d", + topic, partition, resp.AssignedOffset, resp.AckTsNs) + } + + // Handle structured broker errors + if kafkaErrorCode, errorMsg, handleErr := HandleBrokerResponse(resp); handleErr != nil { + return 0, handleErr + } else if kafkaErrorCode != 0 { + // Return error with Kafka error code information for better debugging + return 0, fmt.Errorf("broker error (Kafka code %d): %s", kafkaErrorCode, errorMsg) + } + + // Use the assigned offset from SMQ, not the timestamp + return resp.AssignedOffset, nil +} + +// PublishRecordValue publishes a RecordValue message to SeaweedMQ via broker +func (bc *BrokerClient) PublishRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte, timestamp int64) (int64, error) { + session, err := bc.getOrCreatePublisher(topic, partition) + if err != nil { + return 0, err + } + + if session.Stream == nil { + return 0, fmt.Errorf("publisher session stream cannot be nil") + } + + // CRITICAL: Lock to prevent concurrent Send/Recv causing response mix-ups + session.mu.Lock() + defer session.mu.Unlock() + + // Send data message with RecordValue in the Value field + dataMsg := &mq_pb.DataMessage{ + Key: key, + Value: recordValueBytes, // This contains the marshaled RecordValue + TsNs: timestamp, + } + + if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: dataMsg, + }, + }); err != nil { + return 0, fmt.Errorf("failed to send RecordValue data: %v", err) + } + + // Read acknowledgment + resp, err := session.Stream.Recv() + if err != nil { + return 0, fmt.Errorf("failed to receive RecordValue ack: %v", err) + } + + // Handle structured broker errors + if kafkaErrorCode, errorMsg, handleErr := HandleBrokerResponse(resp); handleErr != nil { + return 0, handleErr + } else if kafkaErrorCode != 0 { + // Return error with Kafka error code information for better debugging + return 0, fmt.Errorf("RecordValue broker error (Kafka code %d): %s", kafkaErrorCode, errorMsg) + } + + // Use the assigned offset from SMQ, not the timestamp + return resp.AssignedOffset, nil +} + +// getOrCreatePublisher gets or creates a publisher stream for a topic-partition +func (bc *BrokerClient) getOrCreatePublisher(topic string, partition int32) (*BrokerPublisherSession, error) { + key := fmt.Sprintf("%s-%d", topic, partition) + + // Try to get existing publisher + bc.publishersLock.RLock() + if session, exists := bc.publishers[key]; exists { + bc.publishersLock.RUnlock() + return session, nil + } + bc.publishersLock.RUnlock() + + // Create new publisher stream + bc.publishersLock.Lock() + defer bc.publishersLock.Unlock() + + // Double-check after acquiring write lock + if session, exists := bc.publishers[key]; exists { + return session, nil + } + + // Create the stream + stream, err := bc.client.PublishMessage(bc.ctx) + if err != nil { + return nil, fmt.Errorf("failed to create publish stream: %v", err) + } + + // Get the actual partition assignment from the broker instead of using Kafka partition mapping + actualPartition, err := bc.getActualPartitionAssignment(topic, partition) + if err != nil { + return nil, fmt.Errorf("failed to get actual partition assignment: %v", err) + } + + // Send init message using the actual partition structure that the broker allocated + if err := stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + Partition: actualPartition, + AckInterval: 1, + PublisherName: "kafka-gateway", + }, + }, + }); err != nil { + return nil, fmt.Errorf("failed to send init message: %v", err) + } + + // CRITICAL: Consume the "hello" message sent by broker after init + // Broker sends empty PublishMessageResponse{} on line 137 of broker_grpc_pub.go + // Without this, first Recv() in PublishRecord gets hello instead of data ack + helloResp, err := stream.Recv() + if err != nil { + return nil, fmt.Errorf("failed to receive hello message: %v", err) + } + if helloResp.ErrorCode != 0 { + return nil, fmt.Errorf("broker init error (code %d): %s", helloResp.ErrorCode, helloResp.Error) + } + + session := &BrokerPublisherSession{ + Topic: topic, + Partition: partition, + Stream: stream, + } + + bc.publishers[key] = session + return session, nil +} + +// ClosePublisher closes a specific publisher session +func (bc *BrokerClient) ClosePublisher(topic string, partition int32) error { + key := fmt.Sprintf("%s-%d", topic, partition) + + bc.publishersLock.Lock() + defer bc.publishersLock.Unlock() + + session, exists := bc.publishers[key] + if !exists { + return nil // Already closed or never existed + } + + if session.Stream != nil { + session.Stream.CloseSend() + } + delete(bc.publishers, key) + return nil +} + +// getActualPartitionAssignment looks up the actual partition assignment from the broker configuration +func (bc *BrokerClient) getActualPartitionAssignment(topic string, kafkaPartition int32) (*schema_pb.Partition, error) { + // Look up the topic configuration from the broker to get the actual partition assignments + lookupResp, err := bc.client.LookupTopicBrokers(bc.ctx, &mq_pb.LookupTopicBrokersRequest{ + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to lookup topic brokers: %v", err) + } + + if len(lookupResp.BrokerPartitionAssignments) == 0 { + return nil, fmt.Errorf("no partition assignments found for topic %s", topic) + } + + totalPartitions := int32(len(lookupResp.BrokerPartitionAssignments)) + if kafkaPartition >= totalPartitions { + return nil, fmt.Errorf("kafka partition %d out of range, topic %s has %d partitions", + kafkaPartition, topic, totalPartitions) + } + + // Calculate expected range for this Kafka partition based on actual partition count + // Ring is divided equally among partitions, with last partition getting any remainder + rangeSize := int32(pub_balancer.MaxPartitionCount) / totalPartitions + expectedRangeStart := kafkaPartition * rangeSize + var expectedRangeStop int32 + + if kafkaPartition == totalPartitions-1 { + // Last partition gets the remainder to fill the entire ring + expectedRangeStop = int32(pub_balancer.MaxPartitionCount) + } else { + expectedRangeStop = (kafkaPartition + 1) * rangeSize + } + + glog.V(2).Infof("Looking for Kafka partition %d in topic %s: expected range [%d, %d] out of %d partitions", + kafkaPartition, topic, expectedRangeStart, expectedRangeStop, totalPartitions) + + // Find the broker assignment that matches this range + for _, assignment := range lookupResp.BrokerPartitionAssignments { + if assignment.Partition == nil { + continue + } + + // Check if this assignment's range matches our expected range + if assignment.Partition.RangeStart == expectedRangeStart && assignment.Partition.RangeStop == expectedRangeStop { + glog.V(1).Infof("found matching partition assignment for %s[%d]: {RingSize: %d, RangeStart: %d, RangeStop: %d, UnixTimeNs: %d}", + topic, kafkaPartition, assignment.Partition.RingSize, assignment.Partition.RangeStart, + assignment.Partition.RangeStop, assignment.Partition.UnixTimeNs) + return assignment.Partition, nil + } + } + + // If no exact match found, log all available assignments for debugging + glog.Warningf("no partition assignment found for Kafka partition %d in topic %s with expected range [%d, %d]", + kafkaPartition, topic, expectedRangeStart, expectedRangeStop) + glog.Warningf("Available assignments:") + for i, assignment := range lookupResp.BrokerPartitionAssignments { + if assignment.Partition != nil { + glog.Warningf(" Assignment[%d]: {RangeStart: %d, RangeStop: %d, RingSize: %d}", + i, assignment.Partition.RangeStart, assignment.Partition.RangeStop, assignment.Partition.RingSize) + } + } + + return nil, fmt.Errorf("no broker assignment found for Kafka partition %d with expected range [%d, %d]", + kafkaPartition, expectedRangeStart, expectedRangeStop) +} |
