diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 72 |
1 files changed, 54 insertions, 18 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index c98ce4684..ed6b5a900 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -7,26 +7,47 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "time" ) -func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error { +func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { + + ctx := stream.Context() + clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) t := topic.FromPbTopic(req.GetInit().Topic) - partition := topic.FromPbPartition(req.GetInit().Partition) - localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition) - if localTopicPartition == nil { - stream.Send(&mq_pb.SubscribeResponse{ - Message: &mq_pb.SubscribeResponse_Ctrl{ - Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{ + partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) + + glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + + var localTopicPartition *topic.LocalPartition + localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) + for localTopicPartition == nil { + stream.Send(&mq_pb.SubscribeMessageResponse{ + Message: &mq_pb.SubscribeMessageResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{ Error: "not initialized", }, }, }) - return nil + time.Sleep(337 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return nil + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return nil + default: + // Continue processing the request + } + localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) } - clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) isConnected := true @@ -37,15 +58,30 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition) }() - ctx := stream.Context() - var startTime time.Time - if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 { - startTime = time.Unix(0, startTs) - } else { - startTime = time.Now() + var startPosition log_buffer.MessagePosition + var inMemoryOnly bool + if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil { + offset := req.GetInit().GetPartitionOffset() + if offset.StartTsNs != 0 { + startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) + } + if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { + startPosition = log_buffer.NewMessagePosition(1, -2) + } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2) + } else if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY { + inMemoryOnly = true + for !localTopicPartition.HasData() { + time.Sleep(337 * time.Millisecond) + } + memPosition := localTopicPartition.GetEarliestInMemoryMessagePosition() + if startPosition.Before(memPosition.Time) { + startPosition = memPosition + } + } } - localTopicPartition.Subscribe(clientName, startTime, func() bool { + localTopicPartition.Subscribe(clientName, startPosition, inMemoryOnly, func() bool { if !isConnected { return false } @@ -53,7 +89,7 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb if sleepIntervalCount > 10 { sleepIntervalCount = 10 } - time.Sleep(time.Duration(sleepIntervalCount) * 2339 * time.Millisecond) + time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond) // Check if the client has disconnected by monitoring the context select { @@ -75,7 +111,7 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb sleepIntervalCount = 0 value := logEntry.GetData() - if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{ + if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ Data: &mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)), Value: value, |
