aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go50
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index a6562fd5c..3c902a503 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -1,6 +1,7 @@
package topic
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -128,6 +129,55 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
}
}
+func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
+ if p.FollowerStream != nil {
+ return nil
+ }
+ if len(initMessage.FollowerBrokers) == 0 {
+ return nil
+ }
+
+ follower := initMessage.FollowerBrokers[0]
+ ctx := context.Background()
+ p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", follower, err)
+ }
+ followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
+ p.FollowerStream, err = followerClient.PublishFollowMe(ctx)
+ if err != nil {
+ return fmt.Errorf("fail to create publish client: %v", err)
+ }
+ if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Init{
+ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
+ },
+ },
+ }); err != nil {
+ return err
+ }
+
+ // start receiving ack from follower
+ go func() {
+ defer func() {
+ println("stop receiving ack from follower")
+ }()
+
+ for {
+ ack, err := p.FollowerStream.Recv()
+ if err != nil {
+ glog.Errorf("Error receiving follower ack: %v", err)
+ return
+ }
+ atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
+ println("recv ack", ack.AckTsNs)
+ }
+ }()
+ return nil
+}
+
func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
if p.MaybeShutdownLocalPartition() {
if p.FollowerStream != nil {