diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-17 18:21:21 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-17 18:21:21 -0700 |
| commit | 8d5bb7420d004fe5f6eb9a53f59b95ec9a4b316f (patch) | |
| tree | 8f3543bbb6efdb198d84264f1837b486078cdfa7 /weed/mq/topic/local_partition.go | |
| parent | 94742b8ace98c3c0c067f33cc70eec26e12de0ab (diff) | |
| download | seaweedfs-8d5bb7420d004fe5f6eb9a53f59b95ec9a4b316f.tar.xz seaweedfs-8d5bb7420d004fe5f6eb9a53f59b95ec9a4b316f.zip | |
rename
Diffstat (limited to 'weed/mq/topic/local_partition.go')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 54c122a0f..ce863791f 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -28,7 +28,7 @@ type LocalPartition struct { Publishers *LocalPartitionPublishers Subscribers *LocalPartitionSubscribers - followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient + publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient followerGrpcConnection *grpc.ClientConn follower string } @@ -55,9 +55,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { p.LogBuffer.AddToBuffer(message) // maybe send to the follower - if p.followerStream != nil { + if p.publishFolloweMeStream != nil { // println("recv", string(message.Key), message.TsNs) - if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Data{ Data: message, }, @@ -134,7 +134,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { - if p.followerStream != nil { + if p.publishFolloweMeStream != nil { return nil } if len(initMessage.FollowerBrokers) == 0 { @@ -148,11 +148,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa return fmt.Errorf("fail to dial %s: %v", p.follower, err) } followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection) - p.followerStream, err = followerClient.PublishFollowMe(ctx) + p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx) if err != nil { return fmt.Errorf("fail to create publish client: %v", err) } - if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Init{ Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Topic: initMessage.Topic, @@ -170,7 +170,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa }() for { - ack, err := p.followerStream.Recv() + ack, err := p.publishFolloweMeStream.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Canceled { @@ -194,9 +194,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { for !p.LogBuffer.IsAllFlushed() { time.Sleep(113 * time.Millisecond) } - if p.followerStream != nil { + if p.publishFolloweMeStream != nil { // send close to the follower - if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Close{ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, }, @@ -205,7 +205,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { } glog.V(4).Infof("closing grpcConnection to follower") p.followerGrpcConnection.Close() - p.followerStream = nil + p.publishFolloweMeStream = nil p.follower = "" } @@ -224,8 +224,8 @@ func (p *LocalPartition) Shutdown() { } func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { - if p.followerStream != nil { - if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{ + if p.publishFolloweMeStream != nil { + if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Flush{ Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ TsNs: flushTsNs, |
