diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 34 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub_follow.go | 78 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 192 | ||||
| -rw-r--r-- | weed/mq/broker/broker_topic_partition_read_write.go | 3 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 18 | ||||
| -rw-r--r-- | weed/mq/topic/local_topic.go | 1 |
6 files changed, 293 insertions, 33 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 17d01f620..8c46ea99d 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" "io" @@ -59,6 +60,21 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return stream.Send(response) } ackInterval = int(initMessage.AckInterval) + for _, follower := range initMessage.FollowerBrokers { + followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ + Topic: initMessage.Topic, + Partition: initMessage.Partition, + BrokerSelf: string(b.option.BrokerAddress()), + }) + return err + }) + if followErr != nil { + response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr) + glog.Errorf("follower %v failed: %v", follower, followErr) + return stream.Send(response) + } + } stream.Send(response) } else { response.Error = fmt.Sprintf("missing init message") @@ -86,21 +102,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence) }() go func() { - for { - select { - case resp := <-respChan: - if resp != nil { - if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending response %v: %v", resp, err) - } - } else { - return - } - case <-localTopicPartition.StopPublishersCh: - respChan <- &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - ShouldClose: true, - } + for resp := range respChan { + if err := stream.Send(resp); err != nil { + glog.Errorf("Error sending response %v: %v", resp, err) } } }() diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go new file mode 100644 index 000000000..e74d7025f --- /dev/null +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -0,0 +1,78 @@ +package broker + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "io" + "math/rand" + "time" +) + +func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error){ + glog.V(0).Infof("PublishFollowMe %v", request) + go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error { + followerId := rand.Int31() + subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{ + Message: &mq_pb.FollowInMemoryMessagesRequest_Init{ + Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{ + ConsumerGroup: string(b.option.BrokerAddress()), + ConsumerId: fmt.Sprintf("followMe-%d", followerId), + FollowerId: followerId, + Topic: request.Topic, + PartitionOffset: &mq_pb.PartitionOffset{ + Partition: request.Partition, + StartTsNs: 0, + StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY, + }, + }, + }, + }) + if err != nil { + glog.Errorf("FollowInMemoryMessages error: %v", err) + return err + } + + b.doFollowInMemoryMessage(context.Background(), subscribeClient) + + return nil + }) + return &mq_pb.PublishFollowMeResponse{}, nil +} + +func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { + for { + resp, err := client.Recv() + if err != nil { + if err != io.EOF { + glog.V(0).Infof("doFollowInMemoryMessage error: %v", err) + } + return + } + if resp == nil { + glog.V(0).Infof("doFollowInMemoryMessage nil response") + return + } + if resp.Message != nil { + // process ctrl message or data message + switch m:= resp.Message.(type) { + case *mq_pb.FollowInMemoryMessagesResponse_Data: + // process data message + print("d") + case *mq_pb.FollowInMemoryMessagesResponse_Ctrl: + // process ctrl message + if m.Ctrl.FlushedSequence > 0 { + flushTime := time.Unix(0, m.Ctrl.FlushedSequence) + glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime) + } + if m.Ctrl.FollowerChangedToId != 0 { + // follower changed + glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId) + return + } + } + } + } +} diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index c6dde6f4e..3280be2c0 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -8,6 +8,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "sync/atomic" "time" ) @@ -69,15 +70,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest var startPosition log_buffer.MessagePosition if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { - offset := req.GetInit().GetPartitionOffset() - if offset.StartTsNs != 0 { - startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) - } - if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { - startPosition = log_buffer.NewMessagePosition(1, -3) - } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { - startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) - } + startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) } return localTopicPartition.Subscribe(clientName, startPosition, func() bool { @@ -85,10 +78,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest return false } sleepIntervalCount++ - if sleepIntervalCount > 10 { - sleepIntervalCount = 10 + if sleepIntervalCount > 32 { + sleepIntervalCount = 32 } - time.Sleep(time.Duration(sleepIntervalCount) * 337 * time.Millisecond) + time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) // Check if the client has disconnected by monitoring the context select { @@ -116,6 +109,179 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest TsNs: logEntry.TsNs, }, }}); err != nil { + glog.Errorf("Error sending data: %v", err) + return false, err + } + + counter++ + return false, nil + }) +} + +func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) { + if offset.StartTsNs != 0 { + startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) + } + if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { + startPosition = log_buffer.NewMessagePosition(1, -3) + } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) + } + return +} + +func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { + ctx := stream.Context() + clientName := req.GetInit().ConsumerId + + t := topic.FromPbTopic(req.GetInit().Topic) + partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) + + glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition) + + waitIntervalCount := 0 + + var localTopicPartition *topic.LocalPartition + for localTopicPartition == nil { + localTopicPartition, err = b.GetOrGenLocalPartition(t, partition) + if err != nil { + glog.V(1).Infof("topic %v partition %v not setup", t, partition) + } + if localTopicPartition != nil { + break + } + waitIntervalCount++ + if waitIntervalCount > 32 { + waitIntervalCount = 32 + } + time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return nil + } + glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err) + return nil + default: + // Continue processing the request + } + } + + // set the current follower id + followerId := req.GetInit().FollowerId + atomic.StoreInt32(&localTopicPartition.FollowerId, followerId) + + glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition) + isConnected := true + sleepIntervalCount := 0 + + var counter int64 + defer func() { + isConnected = false + glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter) + }() + + var startPosition log_buffer.MessagePosition + if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil { + startPosition = getRequestPosition(req.GetInit().GetPartitionOffset()) + } + + var prevFlushTsNs int64 + + _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { + if !isConnected { + return false + } + sleepIntervalCount++ + if sleepIntervalCount > 32 { + sleepIntervalCount = 32 + } + time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond) + + if localTopicPartition.LogBuffer.IsStopping() { + newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) + glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId) + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FollowerChangedToId: newFollowerId, + }, + }, + }) + return false + } + + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return false + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return false + default: + // Continue processing the request + } + + // send the last flushed sequence + flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs) + if flushTsNs != prevFlushTsNs { + prevFlushTsNs = flushTsNs + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FlushedSequence: flushTsNs, + }, + }, + }) + } + + return true + }, func(logEntry *filer_pb.LogEntry) (bool, error) { + // reset the sleep interval count + sleepIntervalCount = 0 + + // check the follower id + newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId) + if newFollowerId != followerId { + glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId) + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FollowerChangedToId: newFollowerId, + }, + }, + }) + return true, nil + } + + // send the last flushed sequence + flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs) + if flushTsNs != prevFlushTsNs { + prevFlushTsNs = flushTsNs + stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ + FlushedSequence: flushTsNs, + }, + }, + }) + } + + // send the log entry + if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ + Message: &mq_pb.FollowInMemoryMessagesResponse_Data{ + Data: &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + }, + }}); err != nil { glog.Errorf("Error sending setup response: %v", err) return false, err } @@ -123,4 +289,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest counter++ return false, nil }) + + return err } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index 4ebb62000..a058d8da5 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" "math" + "sync/atomic" "time" ) @@ -38,6 +39,8 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par break } } + + atomic.StoreInt64(&logBuffer.LastFlushTsNs, stopTime.UnixNano()) } } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 0947d259b..062f3f4bd 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "sync/atomic" "time" ) @@ -15,10 +16,9 @@ type LocalPartition struct { FollowerBrokers []pb.ServerAddress LogBuffer *log_buffer.LogBuffer ConsumerCount int32 - StopPublishersCh chan struct{} Publishers *LocalPartitionPublishers - StopSubscribersCh chan struct{} Subscribers *LocalPartitionSubscribers + FollowerId int32 } var TIME_FORMAT = "2006-01-02-15-04-05" @@ -58,6 +58,9 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M startPosition = processedPosition processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn) + if isDone { + return nil + } startPosition = processedPosition if readInMemoryLogErr == log_buffer.ResumeFromDiskError { @@ -67,9 +70,6 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) return readInMemoryLogErr } - if isDone { - return nil - } } } @@ -96,7 +96,6 @@ func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, func (p *LocalPartition) closePublishers() { p.Publishers.SignalShutdown() - close(p.StopPublishersCh) } func (p *LocalPartition) closeSubscribers() { p.Subscribers.SignalShutdown() @@ -118,3 +117,10 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { } return } + +func (p *LocalPartition) Shutdown() { + p.closePublishers() + p.closeSubscribers() + p.LogBuffer.ShutdownLogBuffer() + atomic.StoreInt32(&p.FollowerId, 0) +} diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go index 7825d2168..8ab2a0db5 100644 --- a/weed/mq/topic/local_topic.go +++ b/weed/mq/topic/local_topic.go @@ -27,6 +27,7 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool { for i, localPartition := range localTopic.Partitions { if localPartition.Partition.Equals(partition) { foundPartitionIndex = i + localPartition.Shutdown() break } } |
