aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-27 22:58:20 -0700
committerchrislu <chris.lu@gmail.com>2024-03-27 22:58:20 -0700
commit059a1207081644b6c6369b09044ce96d0368e66e (patch)
tree6963e8dbd18ecf5aa1a738ff0667bbed473d4d56 /weed/mq/topic/local_partition.go
parent7d87c1d2bbe3c8847f5b5e5a4d8d8376996dcf8b (diff)
downloadseaweedfs-059a1207081644b6c6369b09044ce96d0368e66e.tar.xz
seaweedfs-059a1207081644b6c6369b09044ce96d0368e66e.zip
refactor
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go50
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index a6562fd5c..3c902a503 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -1,6 +1,7 @@
package topic
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -128,6 +129,55 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
}
}
+func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
+ if p.FollowerStream != nil {
+ return nil
+ }
+ if len(initMessage.FollowerBrokers) == 0 {
+ return nil
+ }
+
+ follower := initMessage.FollowerBrokers[0]
+ ctx := context.Background()
+ p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", follower, err)
+ }
+ followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
+ p.FollowerStream, err = followerClient.PublishFollowMe(ctx)
+ if err != nil {
+ return fmt.Errorf("fail to create publish client: %v", err)
+ }
+ if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Init{
+ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
+ },
+ },
+ }); err != nil {
+ return err
+ }
+
+ // start receiving ack from follower
+ go func() {
+ defer func() {
+ println("stop receiving ack from follower")
+ }()
+
+ for {
+ ack, err := p.FollowerStream.Recv()
+ if err != nil {
+ glog.Errorf("Error receiving follower ack: %v", err)
+ return
+ }
+ atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
+ println("recv ack", ack.AckTsNs)
+ }
+ }()
+ return nil
+}
+
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
if p.MaybeShutdownLocalPartition() {
if p.FollowerStream != nil {