aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go131
1 files changed, 121 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 02488b2b0..286812a9b 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -4,24 +4,30 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "io"
"time"
)
-func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) {
+func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
- ctx := stream.Context()
- clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
-
- initMessage := req.GetInit()
- if initMessage == nil {
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+ if req.GetInit() == nil {
glog.Errorf("missing init message")
return fmt.Errorf("missing init message")
}
+ ctx := stream.Context()
+ clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
@@ -47,11 +53,98 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
}
}()
- var startPosition log_buffer.MessagePosition
- if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
- startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
+ startPosition := b.getRequestPosition(req.GetInit())
+ imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().Concurrency))
+
+ // connect to the follower
+ var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
+ glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
+ if req.GetInit().FollowerBroker != "" {
+ follower := req.GetInit().FollowerBroker
+ if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
+ return fmt.Errorf("fail to dial %s: %v", follower, err)
+ } else {
+ defer func() {
+ println("closing SubscribeFollowMe connection", follower)
+ subscribeFollowMeStream.CloseSend()
+ // followerGrpcConnection.Close()
+ }()
+ followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
+ if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
+ return fmt.Errorf("fail to subscribe to %s: %v", follower, err)
+ } else {
+ if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
+ Message: &mq_pb.SubscribeFollowMeRequest_Init{
+ Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
+ Topic: req.GetInit().Topic,
+ Partition: req.GetInit().GetPartitionOffset().Partition,
+ ConsumerGroup: req.GetInit().ConsumerGroup,
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("fail to send init to %s: %v", follower, err)
+ }
+ }
+ }
+ glog.V(0).Infof("follower %s connected", follower)
}
+ go func() {
+ var lastOffset int64
+ for {
+ ack, err := stream.Recv()
+ if err != nil {
+ if err == io.EOF {
+ // the client has called CloseSend(). This is to ack the close.
+ stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
+ Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
+ IsEndOfStream: true,
+ },
+ }})
+ break
+ }
+ glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
+ break
+ }
+ if ack.GetAck().Key == nil {
+ // skip ack for control messages
+ continue
+ }
+ imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
+ currentLastOffset := imt.GetOldestAckedTimestamp()
+ // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
+ if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
+ if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
+ Message: &mq_pb.SubscribeFollowMeRequest_Ack{
+ Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
+ TsNs: currentLastOffset,
+ },
+ },
+ }); err != nil {
+ glog.Errorf("Error sending ack to follower: %v", err)
+ break
+ }
+ lastOffset = currentLastOffset
+ // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
+ }
+ }
+ if lastOffset > 0 {
+ glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
+ if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
+ glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
+ }
+ }
+ if subscribeFollowMeStream != nil {
+ if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
+ Message: &mq_pb.SubscribeFollowMeRequest_Close{
+ Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
+ },
+ }); err != nil {
+ glog.Errorf("Error sending close to follower: %v", err)
+ }
+ }
+ }()
+
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
if !isConnected {
return false
@@ -81,6 +174,13 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
// reset the sleep interval count
sleepIntervalCount = 0
+ for imt.IsInflight(logEntry.Key) {
+ time.Sleep(137 * time.Millisecond)
+ }
+ if logEntry.Key != nil {
+ imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
+ }
+
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: &mq_pb.DataMessage{
Key: logEntry.Key,
@@ -97,10 +197,21 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
})
}
-func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
+func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
+ if initMessage == nil {
+ return
+ }
+ offset := initMessage.GetPartitionOffset()
if offset.StartTsNs != 0 {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
+ return
+ }
+ if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
+ glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
+ startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
+ return
}
+
if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
startPosition = log_buffer.NewMessagePosition(1, -3)
} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {