diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-27 22:58:20 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-27 22:58:20 -0700 |
| commit | 059a1207081644b6c6369b09044ce96d0368e66e (patch) | |
| tree | 6963e8dbd18ecf5aa1a738ff0667bbed473d4d56 /weed/mq/topic/local_partition.go | |
| parent | 7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b (diff) | |
| download | seaweedfs-059a1207081644b6c6369b09044ce96d0368e66e.tar.xz seaweedfs-059a1207081644b6c6369b09044ce96d0368e66e.zip | |
refactor
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index a6562fd5c..3c902a503 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -1,6 +1,7 @@ package topic import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -128,6 +129,55 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } } +func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { + if p.FollowerStream != nil { + return nil + } + if len(initMessage.FollowerBrokers) == 0 { + return nil + } + + follower := initMessage.FollowerBrokers[0] + ctx := context.Background() + p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", follower, err) + } + followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection) + p.FollowerStream, err = followerClient.PublishFollowMe(ctx) + if err != nil { + return fmt.Errorf("fail to create publish client: %v", err) + } + if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{ + Message: &mq_pb.PublishFollowMeRequest_Init{ + Init: &mq_pb.PublishFollowMeRequest_InitMessage{ + Topic: initMessage.Topic, + Partition: initMessage.Partition, + }, + }, + }); err != nil { + return err + } + + // start receiving ack from follower + go func() { + defer func() { + println("stop receiving ack from follower") + }() + + for { + ack, err := p.FollowerStream.Recv() + if err != nil { + glog.Errorf("Error receiving follower ack: %v", err) + return + } + atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) + println("recv ack", ack.AckTsNs) + } + }() + return nil +} + func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { if p.MaybeShutdownLocalPartition() { if p.FollowerStream != nil { |
