aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go212
1 files changed, 10 insertions, 202 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 1141ff47f..02488b2b0 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -8,7 +8,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "sync/atomic"
"time"
)
@@ -17,40 +16,20 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
ctx := stream.Context()
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+ initMessage := req.GetInit()
+ if initMessage == nil {
+ glog.Errorf("missing init message")
+ return fmt.Errorf("missing init message")
+ }
+
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
- waitIntervalCount := 0
-
- var localTopicPartition *topic.LocalPartition
- for localTopicPartition == nil {
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
- if err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
- if localTopicPartition != nil {
- break
- }
- waitIntervalCount++
- if waitIntervalCount > 10 {
- waitIntervalCount = 10
- }
- time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
- }
+ localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
+ if getOrGenErr != nil {
+ return getOrGenErr
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
@@ -64,7 +43,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
- b.localTopicManager.RemoveTopicPartition(t, partition)
+ b.localTopicManager.RemoveLocalPartition(t, partition)
}
}()
@@ -129,174 +108,3 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer
}
return
}
-
-func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
- ctx := stream.Context()
- clientName := req.GetInit().ConsumerId
-
- t := topic.FromPbTopic(req.GetInit().Topic)
- partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
-
- glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
-
- waitIntervalCount := 0
-
- var localTopicPartition *topic.LocalPartition
- for localTopicPartition == nil {
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
- if err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
- if localTopicPartition != nil {
- break
- }
- waitIntervalCount++
- if waitIntervalCount > 32 {
- waitIntervalCount = 32
- }
- time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
- }
- }
-
- // set the current follower id
- followerId := req.GetInit().FollowerId
- atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
-
- glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
- isConnected := true
- sleepIntervalCount := 0
-
- var counter int64
- defer func() {
- isConnected = false
- glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
- }()
-
- // send first hello message
- // to indicate the follower is connected
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
- },
- })
-
- var startPosition log_buffer.MessagePosition
- if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
- startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
- }
-
- var prevFlushTsNs int64
-
- _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
- if !isConnected {
- return false
- }
- sleepIntervalCount++
- if sleepIntervalCount > 32 {
- sleepIntervalCount = 32
- }
- time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
-
- if localTopicPartition.LogBuffer.IsStopping() {
- newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
- glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FollowerChangedToId: newFollowerId,
- },
- },
- })
- return false
- }
-
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return false
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return false
- default:
- // Continue processing the request
- }
-
- // send the last flushed sequence
- flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
- if flushTsNs != prevFlushTsNs {
- prevFlushTsNs = flushTsNs
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FlushedSequence: flushTsNs,
- },
- },
- })
- }
-
- return true
- }, func(logEntry *filer_pb.LogEntry) (bool, error) {
- // reset the sleep interval count
- sleepIntervalCount = 0
-
- // check the follower id
- newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
- if newFollowerId != followerId {
- glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FollowerChangedToId: newFollowerId,
- },
- },
- })
- return true, nil
- }
-
- // send the last flushed sequence
- flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
- if flushTsNs != prevFlushTsNs {
- prevFlushTsNs = flushTsNs
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FlushedSequence: flushTsNs,
- },
- },
- })
- }
-
- // send the log entry
- if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
- Data: &mq_pb.DataMessage{
- Key: logEntry.Key,
- Value: logEntry.Data,
- TsNs: logEntry.TsNs,
- },
- }}); err != nil {
- glog.Errorf("Error sending setup response: %v", err)
- return false, err
- }
-
- counter++
- return false, nil
- })
-
- return err
-}