diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 192 |
1 files changed, 180 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index c6dde6f4e..3280be2c0 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "sync/atomic" "time" ) @@ -69,15 +70,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest var startPosition log_buffer.MessagePosition if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { - offset := req.GetInit().GetPartitionOffset() - if offset.StartTsNs != 0 { - startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) - } - if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { - startPosition = log_buffer.NewMessagePosition(1, -3) - } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { - startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) - } + startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) } return localTopicPartition.Subscribe(clientName, startPosition, func() bool { @@ -85,10 +78,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest return false } sleepIntervalCount++ - if sleepIntervalCount > 10 { - sleepIntervalCount = 10 + if sleepIntervalCount > 32 { + sleepIntervalCount = 32 } - time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond) + time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) // Check if the client has disconnected by monitoring the context select { @@ -116,6 +109,179 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest TsNs: logEntry.TsNs, }, }}); err != nil { + glog.Errorf("Error sending data: %v", err) + return false, err + } + + counter++ + return false, nil + }) +} + +func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) { + if offset.StartTsNs != 0 { + startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) + } + if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { + startPosition = log_buffer.NewMessagePosition(1, -3) + } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) + } + return +} + +func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { + ctx := stream.Context() + clientName := req.GetInit().ConsumerId + + t := topic.FromPbTopic(req.GetInit().Topic) + partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) + + glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition) + + waitIntervalCount := 0 + + var localTopicPartition *topic.LocalPartition + for localTopicPartition == nil { + localTopicPartition, err = b.GetOrGenLocalPartition(t, partition) + if err != nil { + glog.V(1).Infof("topic %v partition %v not setup", t, partition) + } + if localTopicPartition != nil { + break + } + waitIntervalCount++ + if waitIntervalCount > 32 { + waitIntervalCount = 32 + } + time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return nil + } + glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err) + return nil + default: + // Continue processing the request + } + } + + // set the current follower id + followerId := req.GetInit().FollowerId + atomic.StoreInt32(&localTopicPartition.FollowerId, followerId) + + glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition) + isConnected := true + sleepIntervalCount := 0 + + var counter int64 + defer func() { + isConnected = false + glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) + }() + + var startPosition log_buffer.MessagePosition + if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { + startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) + } + + var prevFlushTsNs int64 + + _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { + if !isConnected { + return false + } + sleepIntervalCount++ + if sleepIntervalCount > 32 { + sleepIntervalCount = 32 + } + time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) + + if localTopicPartition.LogBuffer.IsStopping() { + newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) + glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId) + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FollowerChangedToId: newFollowerId, + }, + }, + }) + return false + } + + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return false + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return false + default: + // Continue processing the request + } + + // send the last flushed sequence + flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs) + if flushTsNs != prevFlushTsNs { + prevFlushTsNs = flushTsNs + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FlushedSequence: flushTsNs, + }, + }, + }) + } + + return true + }, func(logEntry *filer_pb.LogEntry) (bool, error) { + // reset the sleep interval count + sleepIntervalCount = 0 + + // check the follower id + newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) + if newFollowerId != followerId { + glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId) + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FollowerChangedToId: newFollowerId, + }, + }, + }) + return true, nil + } + + // send the last flushed sequence + flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs) + if flushTsNs != prevFlushTsNs { + prevFlushTsNs = flushTsNs + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FlushedSequence: flushTsNs, + }, + }, + }) + } + + // send the log entry + if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Data{ + Data: &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + }, + }}); err != nil { glog.Errorf("Error sending setup response: %v", err) return false, err } @@ -123,4 +289,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest counter++ return false, nil }) + + return err } |
