diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 195 |
1 files changed, 153 insertions, 42 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 18f6df8a0..4604394eb 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -45,73 +45,92 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return err } response := &mq_pb.PublishMessageResponse{} - // TODO check whether current broker should be the leader for the topic partition + initMessage := req.GetInit() if initMessage == nil { - response.Error = fmt.Sprintf("missing init message") + response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorInvalidRecord, "missing init message") glog.Errorf("missing init message") return stream.Send(response) } + // Check whether current broker should be the leader for the topic partition + leaderBroker, err := b.findBrokerForTopicPartition(initMessage.Topic, initMessage.Partition) + if err != nil { + response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorTopicNotFound, fmt.Sprintf("failed to find leader for topic partition: %v", err)) + glog.Errorf("failed to find leader for topic partition: %v", err) + return stream.Send(response) + } + + currentBrokerAddress := fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port) + if leaderBroker != currentBrokerAddress { + response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorNotLeaderOrFollower, fmt.Sprintf("not the leader for this partition, leader is: %s", leaderBroker)) + glog.V(1).Infof("rejecting publish request: not the leader for partition, leader is: %s", leaderBroker) + return stream.Send(response) + } + // get or generate a local partition t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) if getOrGenErr != nil { - response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) + response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorTopicNotFound, fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)) glog.Errorf("topic %v not found: %v", t, getOrGenErr) return stream.Send(response) } // connect to follower brokers if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { - response.Error = followerErr.Error() + response.ErrorCode, response.Error = CreateBrokerError(BrokerErrorFollowerConnectionFailed, followerErr.Error()) glog.Errorf("MaybeConnectToFollowers: %v", followerErr) return stream.Send(response) } - var receivedSequence, acknowledgedSequence int64 - var isClosed bool - // process each published messages clientName := fmt.Sprintf("%v-%4d", findClientAddress(stream.Context()), rand.IntN(10000)) publisher := topic.NewLocalPublisher() localTopicPartition.Publishers.AddPublisher(clientName, publisher) - // start sending ack to publisher - ackInterval := int64(1) - if initMessage.AckInterval > 0 { - ackInterval = int64(initMessage.AckInterval) - } - go func() { - defer func() { - // println("stop sending ack to publisher", initMessage.PublisherName) - }() + // DISABLED: Periodic ack goroutine not needed with immediate per-message acks + // Immediate acks provide correct offset information for Kafka Gateway + var receivedSequence, acknowledgedSequence int64 + var isClosed bool - lastAckTime := time.Now() - for !isClosed { - receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) - if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) { - acknowledgedSequence = receivedSequence - response := &mq_pb.PublishMessageResponse{ - AckSequence: acknowledgedSequence, - } - if err := stream.Send(response); err != nil { - glog.Errorf("Error sending response %v: %v", response, err) + if false { + ackInterval := int64(1) + if initMessage.AckInterval > 0 { + ackInterval = int64(initMessage.AckInterval) + } + go func() { + defer func() { + // println("stop sending ack to publisher", initMessage.PublisherName) + }() + + lastAckTime := time.Now() + for !isClosed { + receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) + if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 100*time.Millisecond) { + acknowledgedSequence = receivedSequence + response := &mq_pb.PublishMessageResponse{ + AckTsNs: acknowledgedSequence, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending response %v: %v", response, err) + } + // Update acknowledged offset for this publisher + publisher.UpdateAckedOffset(acknowledgedSequence) + // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) + lastAckTime = time.Now() + } else { + time.Sleep(10 * time.Millisecond) // Reduced from 1s to 10ms for faster acknowledgments } - // Update acknowledged offset for this publisher - publisher.UpdateAckedOffset(acknowledgedSequence) - // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) - lastAckTime = time.Now() - } else { - time.Sleep(1 * time.Second) } - } - }() + }() + } defer func() { // remove the publisher localTopicPartition.Publishers.RemovePublisher(clientName) - if localTopicPartition.MaybeShutdownLocalPartition() { + // Use topic-aware shutdown logic to prevent aggressive removal of system topics + if localTopicPartition.MaybeShutdownLocalPartitionForTopic(t.Name) { b.localTopicManager.RemoveLocalPartition(t, p) glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } @@ -142,26 +161,55 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis continue } - // Basic validation: ensure message can be unmarshaled as RecordValue + // Validate RecordValue structure only for schema-based messages + // Note: Only messages sent via ProduceRecordValue should be in RecordValue format + // Regular Kafka messages and offset management messages are stored as raw bytes if dataMessage.Value != nil { record := &schema_pb.RecordValue{} if err := proto.Unmarshal(dataMessage.Value, record); err == nil { - } else { - // If unmarshaling fails, we skip validation but log a warning - glog.V(1).Infof("Could not unmarshal RecordValue for validation on topic %v partition %v: %v", initMessage.Topic, initMessage.Partition, err) + // Successfully unmarshaled as RecordValue - validate structure + if err := b.validateRecordValue(record, initMessage.Topic); err != nil { + glog.V(1).Infof("RecordValue validation failed on topic %v partition %v: %v", initMessage.Topic, initMessage.Partition, err) + } } + // Note: We don't log errors for non-RecordValue messages since most Kafka messages + // are raw bytes and should not be expected to be in RecordValue format } // The control message should still be sent to the follower // to avoid timing issue when ack messages. - // send to the local partition - if err = localTopicPartition.Publish(dataMessage); err != nil { + // Send to the local partition with offset assignment + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + + // Create offset assignment function for this partition + assignOffsetFn := func() (int64, error) { + return b.offsetManager.AssignOffset(t, p) + } + + // Use offset-aware publishing + assignedOffset, err := localTopicPartition.PublishWithOffset(dataMessage, assignOffsetFn) + if err != nil { return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err) } + // No ForceFlush - subscribers use per-subscriber notification channels for instant wake-up + // Data is served from in-memory LogBuffer with <1ms latency + glog.V(2).Infof("Published offset %d to %s", assignedOffset, initMessage.Topic.Name) + + // Send immediate per-message ack WITH offset + // This is critical for Gateway to return correct offsets to Kafka clients + response := &mq_pb.PublishMessageResponse{ + AckTsNs: dataMessage.TsNs, + AssignedOffset: assignedOffset, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending immediate ack %v: %v", response, err) + return fmt.Errorf("failed to send ack: %v", err) + } + // Update published offset and last seen time for this publisher - publisher.UpdatePublishedOffset(dataMessage.TsNs) + publisher.UpdatePublishedOffset(assignedOffset) } glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) @@ -169,6 +217,30 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return nil } +// validateRecordValue validates the structure and content of a RecordValue message +// Since RecordValue messages are created from successful protobuf unmarshaling, +// their structure is already guaranteed to be valid by the protobuf library. +// Schema validation (if applicable) already happened during Kafka gateway decoding. +func (b *MessageQueueBroker) validateRecordValue(record *schema_pb.RecordValue, topic *schema_pb.Topic) error { + // Check for nil RecordValue + if record == nil { + return fmt.Errorf("RecordValue is nil") + } + + // Check for nil Fields map + if record.Fields == nil { + return fmt.Errorf("RecordValue.Fields is nil") + } + + // Check for empty Fields map + if len(record.Fields) == 0 { + return fmt.Errorf("RecordValue has no fields") + } + + // If protobuf unmarshaling succeeded, the RecordValue is structurally valid + return nil +} + // duplicated from master_grpc_server.go func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) @@ -183,3 +255,42 @@ func findClientAddress(ctx context.Context) string { } return pr.Addr.String() } + +// GetPartitionRangeInfo returns comprehensive range information for a partition (offsets, timestamps, etc.) +func (b *MessageQueueBroker) GetPartitionRangeInfo(ctx context.Context, req *mq_pb.GetPartitionRangeInfoRequest) (*mq_pb.GetPartitionRangeInfoResponse, error) { + if req.Topic == nil || req.Partition == nil { + return &mq_pb.GetPartitionRangeInfoResponse{ + Error: "topic and partition are required", + }, nil + } + + t := topic.FromPbTopic(req.Topic) + p := topic.FromPbPartition(req.Partition) + + // Get offset information from the broker's internal method + info, err := b.GetPartitionOffsetInfoInternal(t, p) + if err != nil { + return &mq_pb.GetPartitionRangeInfoResponse{ + Error: fmt.Sprintf("failed to get partition range info: %v", err), + }, nil + } + + // TODO: Get timestamp range information from chunk metadata or log buffer + // For now, we'll return zero values for timestamps - this can be enhanced later + // to read from Extended attributes (ts_min, ts_max) from filer metadata + timestampRange := &mq_pb.TimestampRangeInfo{ + EarliestTimestampNs: 0, // TODO: Read from chunk metadata ts_min + LatestTimestampNs: 0, // TODO: Read from chunk metadata ts_max + } + + return &mq_pb.GetPartitionRangeInfoResponse{ + OffsetRange: &mq_pb.OffsetRangeInfo{ + EarliestOffset: info.EarliestOffset, + LatestOffset: info.LatestOffset, + HighWaterMark: info.HighWaterMark, + }, + TimestampRange: timestampRange, + RecordCount: info.RecordCount, + ActiveSubscriptions: info.ActiveSubscriptions, + }, nil +} |
