aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go192
1 files changed, 180 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index c6dde6f4e..3280be2c0 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "sync/atomic"
"time"
)
@@ -69,15 +70,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var startPosition log_buffer.MessagePosition
if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
- offset := req.GetInit().GetPartitionOffset()
- if offset.StartTsNs != 0 {
- startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
- }
- if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
- startPosition = log_buffer.NewMessagePosition(1, -3)
- } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
- startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
- }
+ startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
}
return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
@@ -85,10 +78,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
return false
}
sleepIntervalCount++
- if sleepIntervalCount > 10 {
- sleepIntervalCount = 10
+ if sleepIntervalCount > 32 {
+ sleepIntervalCount = 32
}
- time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond)
+ time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
@@ -116,6 +109,179 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
TsNs: logEntry.TsNs,
},
}}); err != nil {
+ glog.Errorf("Error sending data: %v", err)
+ return false, err
+ }
+
+ counter++
+ return false, nil
+ })
+}
+
+func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
+ if offset.StartTsNs != 0 {
+ startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
+ }
+ if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
+ startPosition = log_buffer.NewMessagePosition(1, -3)
+ } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
+ startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
+ }
+ return
+}
+
+func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
+ ctx := stream.Context()
+ clientName := req.GetInit().ConsumerId
+
+ t := topic.FromPbTopic(req.GetInit().Topic)
+ partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
+
+ glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
+
+ waitIntervalCount := 0
+
+ var localTopicPartition *topic.LocalPartition
+ for localTopicPartition == nil {
+ localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
+ if err != nil {
+ glog.V(1).Infof("topic %v partition %v not setup", t, partition)
+ }
+ if localTopicPartition != nil {
+ break
+ }
+ waitIntervalCount++
+ if waitIntervalCount > 32 {
+ waitIntervalCount = 32
+ }
+ time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return nil
+ }
+ glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
+ return nil
+ default:
+ // Continue processing the request
+ }
+ }
+
+ // set the current follower id
+ followerId := req.GetInit().FollowerId
+ atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
+
+ glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
+ isConnected := true
+ sleepIntervalCount := 0
+
+ var counter int64
+ defer func() {
+ isConnected = false
+ glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
+ }()
+
+ var startPosition log_buffer.MessagePosition
+ if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
+ startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
+ }
+
+ var prevFlushTsNs int64
+
+ _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
+ if !isConnected {
+ return false
+ }
+ sleepIntervalCount++
+ if sleepIntervalCount > 32 {
+ sleepIntervalCount = 32
+ }
+ time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
+
+ if localTopicPartition.LogBuffer.IsStopping() {
+ newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
+ glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FollowerChangedToId: newFollowerId,
+ },
+ },
+ })
+ return false
+ }
+
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return false
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return false
+ default:
+ // Continue processing the request
+ }
+
+ // send the last flushed sequence
+ flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
+ if flushTsNs != prevFlushTsNs {
+ prevFlushTsNs = flushTsNs
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FlushedSequence: flushTsNs,
+ },
+ },
+ })
+ }
+
+ return true
+ }, func(logEntry *filer_pb.LogEntry) (bool, error) {
+ // reset the sleep interval count
+ sleepIntervalCount = 0
+
+ // check the follower id
+ newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
+ if newFollowerId != followerId {
+ glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FollowerChangedToId: newFollowerId,
+ },
+ },
+ })
+ return true, nil
+ }
+
+ // send the last flushed sequence
+ flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
+ if flushTsNs != prevFlushTsNs {
+ prevFlushTsNs = flushTsNs
+ stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
+ FlushedSequence: flushTsNs,
+ },
+ },
+ })
+ }
+
+ // send the log entry
+ if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
+ Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
+ Data: &mq_pb.DataMessage{
+ Key: logEntry.Key,
+ Value: logEntry.Data,
+ TsNs: logEntry.TsNs,
+ },
+ }}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
return false, err
}
@@ -123,4 +289,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
counter++
return false, nil
})
+
+ return err
}