aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go72
1 files changed, 54 insertions, 18 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index c98ce4684..ed6b5a900 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -7,26 +7,47 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"time"
)
-func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
+func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
+
+ ctx := stream.Context()
+ clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
t := topic.FromPbTopic(req.GetInit().Topic)
- partition := topic.FromPbPartition(req.GetInit().Partition)
- localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
- if localTopicPartition == nil {
- stream.Send(&mq_pb.SubscribeResponse{
- Message: &mq_pb.SubscribeResponse_Ctrl{
- Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
+ partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
+
+ glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+
+ var localTopicPartition *topic.LocalPartition
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
+ for localTopicPartition == nil {
+ stream.Send(&mq_pb.SubscribeMessageResponse{
+ Message: &mq_pb.SubscribeMessageResponse_Ctrl{
+ Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
Error: "not initialized",
},
},
})
- return nil
+ time.Sleep(337 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return nil
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return nil
+ default:
+ // Continue processing the request
+ }
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
}
- clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
@@ -37,15 +58,30 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb
glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
}()
- ctx := stream.Context()
- var startTime time.Time
- if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
- startTime = time.Unix(0, startTs)
- } else {
- startTime = time.Now()
+ var startPosition log_buffer.MessagePosition
+ var inMemoryOnly bool
+ if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {
+ offset := req.GetInit().GetPartitionOffset()
+ if offset.StartTsNs != 0 {
+ startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
+ }
+ if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
+ startPosition = log_buffer.NewMessagePosition(1, -2)
+ } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
+ startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2)
+ } else if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY {
+ inMemoryOnly = true
+ for !localTopicPartition.HasData() {
+ time.Sleep(337 * time.Millisecond)
+ }
+ memPosition := localTopicPartition.GetEarliestInMemoryMessagePosition()
+ if startPosition.Before(memPosition.Time) {
+ startPosition = memPosition
+ }
+ }
}
- localTopicPartition.Subscribe(clientName, startTime, func() bool {
+ localTopicPartition.Subscribe(clientName, startPosition, inMemoryOnly, func() bool {
if !isConnected {
return false
}
@@ -53,7 +89,7 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb
if sleepIntervalCount > 10 {
sleepIntervalCount = 10
}
- time.Sleep(time.Duration(sleepIntervalCount) * 2339 * time.Millisecond)
+ time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
@@ -75,7 +111,7 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb
sleepIntervalCount = 0
value := logEntry.GetData()
- if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
+ if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: &mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
Value: value,