diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 131 |
1 files changed, 121 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 02488b2b0..286812a9b 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -4,24 +4,30 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "io" "time" ) -func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) { +func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { - ctx := stream.Context() - clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) - - initMessage := req.GetInit() - if initMessage == nil { + req, err := stream.Recv() + if err != nil { + return err + } + if req.GetInit() == nil { glog.Errorf("missing init message") return fmt.Errorf("missing init message") } + ctx := stream.Context() + clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) + t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) @@ -47,11 +53,98 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest } }() - var startPosition log_buffer.MessagePosition - if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { - startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) + startPosition := b.getRequestPosition(req.GetInit()) + imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().Concurrency)) + + // connect to the follower + var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient + glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker) + if req.GetInit().FollowerBroker != "" { + follower := req.GetInit().FollowerBroker + if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil { + return fmt.Errorf("fail to dial %s: %v", follower, err) + } else { + defer func() { + println("closing SubscribeFollowMe connection", follower) + subscribeFollowMeStream.CloseSend() + // followerGrpcConnection.Close() + }() + followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection) + if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil { + return fmt.Errorf("fail to subscribe to %s: %v", follower, err) + } else { + if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ + Message: &mq_pb.SubscribeFollowMeRequest_Init{ + Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{ + Topic: req.GetInit().Topic, + Partition: req.GetInit().GetPartitionOffset().Partition, + ConsumerGroup: req.GetInit().ConsumerGroup, + }, + }, + }); err != nil { + return fmt.Errorf("fail to send init to %s: %v", follower, err) + } + } + } + glog.V(0).Infof("follower %s connected", follower) } + go func() { + var lastOffset int64 + for { + ack, err := stream.Recv() + if err != nil { + if err == io.EOF { + // the client has called CloseSend(). This is to ack the close. + stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ + IsEndOfStream: true, + }, + }}) + break + } + glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) + break + } + if ack.GetAck().Key == nil { + // skip ack for control messages + continue + } + imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence) + currentLastOffset := imt.GetOldestAckedTimestamp() + // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset) + if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { + if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ + Message: &mq_pb.SubscribeFollowMeRequest_Ack{ + Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{ + TsNs: currentLastOffset, + }, + }, + }); err != nil { + glog.Errorf("Error sending ack to follower: %v", err) + break + } + lastOffset = currentLastOffset + // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset) + } + } + if lastOffset > 0 { + glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) + if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil { + glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) + } + } + if subscribeFollowMeStream != nil { + if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ + Message: &mq_pb.SubscribeFollowMeRequest_Close{ + Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{}, + }, + }); err != nil { + glog.Errorf("Error sending close to follower: %v", err) + } + } + }() + return localTopicPartition.Subscribe(clientName, startPosition, func() bool { if !isConnected { return false @@ -81,6 +174,13 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest // reset the sleep interval count sleepIntervalCount = 0 + for imt.IsInflight(logEntry.Key) { + time.Sleep(137 * time.Millisecond) + } + if logEntry.Key != nil { + imt.EnflightMessage(logEntry.Key, logEntry.TsNs) + } + if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ Data: &mq_pb.DataMessage{ Key: logEntry.Key, @@ -97,10 +197,21 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }) } -func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) { +func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) { + if initMessage == nil { + return + } + offset := initMessage.GetPartitionOffset() if offset.StartTsNs != 0 { startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) + return + } + if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { + glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) + startPosition = log_buffer.NewMessagePosition(storedOffset, -2) + return } + if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { startPosition = log_buffer.NewMessagePosition(1, -3) } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { |
