diff options
Diffstat (limited to 'weed/mq/topic')
| -rw-r--r-- | weed/mq/topic/local_manager.go | 15 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 161 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition_publishers.go | 7 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition_subscribers.go | 7 |
4 files changed, 145 insertions, 45 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index aa2eefcdc..79a84561c 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -19,8 +19,8 @@ func NewLocalTopicManager() *LocalTopicManager { } } -// AddTopicPartition adds a topic to the local topic manager -func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) { +// AddLocalPartition adds a topic to the local topic manager +func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition *LocalPartition) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { localTopic = NewLocalTopic(topic) @@ -34,8 +34,8 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition localTopic.Partitions = append(localTopic.Partitions, localPartition) } -// GetTopicPartition gets a topic from the local topic manager -func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition { +// GetLocalPartition gets a topic from the local topic manager +func (manager *LocalTopicManager) GetLocalPartition(topic Topic, partition Partition) *LocalPartition { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return nil @@ -48,7 +48,7 @@ func (manager *LocalTopicManager) RemoveTopic(topic Topic) { manager.topics.Remove(topic.String()) } -func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) { +func (manager *LocalTopicManager) RemoveLocalPartition(topic Topic, partition Partition) (removed bool) { localTopic, ok := manager.topics.Get(topic.String()) if !ok { return false @@ -96,8 +96,9 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br Namespace: string(localTopic.Namespace), Name: localTopic.Name, }, - Partition: localPartition.Partition.ToPbPartition(), - ConsumerCount: localPartition.ConsumerCount, + Partition: localPartition.Partition.ToPbPartition(), + PublisherCount: int32(localPartition.Publishers.Size()), + SubscriberCount: int32(localPartition.Subscribers.Size()), } // fmt.Printf("collect topic %+v partition %+v\n", topicPartition, localPartition.Partition) } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 798949736..54c122a0f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -1,42 +1,74 @@ package topic import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "sync" "sync/atomic" "time" ) type LocalPartition struct { + ListenersWaits int64 + AckTsNs int64 + + // notifying clients + ListenersLock sync.Mutex + ListenersCond *sync.Cond + Partition - isLeader bool - FollowerBrokers []pb.ServerAddress LogBuffer *log_buffer.LogBuffer - ConsumerCount int32 Publishers *LocalPartitionPublishers Subscribers *LocalPartitionSubscribers - FollowerId int32 + + followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient + followerGrpcConnection *grpc.ClientConn + follower string } var TIME_FORMAT = "2006-01-02-15-04-05" -func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { - return &LocalPartition{ +func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { + lp := &LocalPartition{ Partition: partition, - isLeader: isLeader, - FollowerBrokers: followerBrokers, - LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), - 2*time.Minute, logFlushFn, readFromDiskFn, func() {}), Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), } + lp.ListenersCond = sync.NewCond(&lp.ListenersLock) + lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), + 2*time.Minute, logFlushFn, readFromDiskFn, func() { + if atomic.LoadInt64(&lp.ListenersWaits) > 0 { + lp.ListenersCond.Broadcast() + } + }) + return lp } -func (p *LocalPartition) Publish(message *mq_pb.DataMessage) { - p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) +func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { + p.LogBuffer.AddToBuffer(message) + + // maybe send to the follower + if p.followerStream != nil { + // println("recv", string(message.Key), message.TsNs) + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Data{ + Data: message, + }, + }); followErr != nil { + return fmt.Errorf("send to follower %s: %v", p.follower, followErr) + } + } else { + atomic.StoreInt64(&p.AckTsNs, message.TsNs) + } + + return nil } func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition, @@ -85,15 +117,6 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message return p.LogBuffer.GetEarliestPosition() } -func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { - isLeader := assignment.LeaderBroker == string(self) - followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) - for i, followerBroker := range assignment.FollowerBrokers { - followers[i] = pb.ServerAddress(followerBroker) - } - return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn) -} - func (p *LocalPartition) closePublishers() { p.Publishers.SignalShutdown() } @@ -103,18 +126,93 @@ func (p *LocalPartition) closeSubscribers() { func (p *LocalPartition) WaitUntilNoPublishers() { for { - if p.Publishers.IsEmpty() { + if p.Publishers.Size() == 0 { return } time.Sleep(113 * time.Millisecond) } } +func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { + if p.followerStream != nil { + return nil + } + if len(initMessage.FollowerBrokers) == 0 { + return nil + } + + p.follower = initMessage.FollowerBrokers[0] + ctx := context.Background() + p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", p.follower, err) + } + followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection) + p.followerStream, err = followerClient.PublishFollowMe(ctx) + if err != nil { + return fmt.Errorf("fail to create publish client: %v", err) + } + if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Init{ + Init: &mq_pb.PublishFollowMeRequest_InitMessage{ + Topic: initMessage.Topic, + Partition: initMessage.Partition, + }, + }, + }); err != nil { + return err + } + + // start receiving ack from follower + go func() { + defer func() { + // println("stop receiving ack from follower") + }() + + for { + ack, err := p.followerStream.Recv() + if err != nil { + e, _ := status.FromError(err) + if e.Code() == codes.Canceled { + glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.follower) + return + } + glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.follower, err) + return + } + atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) + // println("recv ack", ack.AckTsNs) + } + }() + return nil +} + func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { - if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() { + + if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 { p.LogBuffer.ShutdownLogBuffer() + for !p.LogBuffer.IsAllFlushed() { + time.Sleep(113 * time.Millisecond) + } + if p.followerStream != nil { + // send close to the follower + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Close{ + Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, + }, + }); followErr != nil { + glog.Errorf("Error closing follower stream: %v", followErr) + } + glog.V(4).Infof("closing grpcConnection to follower") + p.followerGrpcConnection.Close() + p.followerStream = nil + p.follower = "" + } + hasShutdown = true } + + glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.follower, hasShutdown) return } @@ -122,5 +220,20 @@ func (p *LocalPartition) Shutdown() { p.closePublishers() p.closeSubscribers() p.LogBuffer.ShutdownLogBuffer() - atomic.StoreInt32(&p.FollowerId, 0) + glog.V(0).Infof("local partition %v shutting down", p.Partition) +} + +func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { + if p.followerStream != nil { + if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Flush{ + Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ + TsNs: flushTsNs, + }, + }, + }); followErr != nil { + glog.Errorf("send follower %s flush message: %v", p.follower, followErr) + } + // println("notifying", p.follower, "flushed at", flushTsNs) + } } diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go index c12f66336..e3c4e3ca6 100644 --- a/weed/mq/topic/local_partition_publishers.go +++ b/weed/mq/topic/local_partition_publishers.go @@ -44,13 +44,6 @@ func (p *LocalPartitionPublishers) SignalShutdown() { } } -func (p *LocalPartitionPublishers) IsEmpty() bool { - p.publishersLock.RLock() - defer p.publishersLock.RUnlock() - - return len(p.publishers) == 0 -} - func (p *LocalPartitionPublishers) Size() int { p.publishersLock.RLock() defer p.publishersLock.RUnlock() diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go index d3b989d72..24341ce7e 100644 --- a/weed/mq/topic/local_partition_subscribers.go +++ b/weed/mq/topic/local_partition_subscribers.go @@ -48,13 +48,6 @@ func (p *LocalPartitionSubscribers) SignalShutdown() { } } -func (p *LocalPartitionSubscribers) IsEmpty() bool { - p.SubscribersLock.RLock() - defer p.SubscribersLock.RUnlock() - - return len(p.Subscribers) == 0 -} - func (p *LocalPartitionSubscribers) Size() int { p.SubscribersLock.RLock() defer p.SubscribersLock.RUnlock() |
