aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-17 18:21:21 -0700
committerchrislu <chris.lu@gmail.com>2024-05-17 18:21:21 -0700
commit8d5bb7420d004fe5f6eb9a53f59b95ec9a4b316f (patch)
tree8f3543bbb6efdb198d84264f1837b486078cdfa7 /weed/mq/topic/local_partition.go
parent94742b8ace98c3c0c067f33cc70eec26e12de0ab (diff)
downloadseaweedfs-8d5bb7420d004fe5f6eb9a53f59b95ec9a4b316f.tar.xz
seaweedfs-8d5bb7420d004fe5f6eb9a53f59b95ec9a4b316f.zip
rename
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go24
1 files changed, 12 insertions, 12 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 54c122a0f..ce863791f 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -28,7 +28,7 @@ type LocalPartition struct {
Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers
- followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
+ publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn
follower string
}
@@ -55,9 +55,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message)
// maybe send to the follower
- if p.followerStream != nil {
+ if p.publishFolloweMeStream != nil {
// println("recv", string(message.Key), message.TsNs)
- if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message,
},
@@ -134,7 +134,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
}
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
- if p.followerStream != nil {
+ if p.publishFolloweMeStream != nil {
return nil
}
if len(initMessage.FollowerBrokers) == 0 {
@@ -148,11 +148,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
return fmt.Errorf("fail to dial %s: %v", p.follower, err)
}
followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
- p.followerStream, err = followerClient.PublishFollowMe(ctx)
+ p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx)
if err != nil {
return fmt.Errorf("fail to create publish client: %v", err)
}
- if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
@@ -170,7 +170,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}()
for {
- ack, err := p.followerStream.Recv()
+ ack, err := p.publishFolloweMeStream.Recv()
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Canceled {
@@ -194,9 +194,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
for !p.LogBuffer.IsAllFlushed() {
time.Sleep(113 * time.Millisecond)
}
- if p.followerStream != nil {
+ if p.publishFolloweMeStream != nil {
// send close to the follower
- if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
@@ -205,7 +205,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
}
glog.V(4).Infof("closing grpcConnection to follower")
p.followerGrpcConnection.Close()
- p.followerStream = nil
+ p.publishFolloweMeStream = nil
p.follower = ""
}
@@ -224,8 +224,8 @@ func (p *LocalPartition) Shutdown() {
}
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
- if p.followerStream != nil {
- if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if p.publishFolloweMeStream != nil {
+ if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Flush{
Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
TsNs: flushTsNs,