aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-27 23:11:35 -0700
committerchrislu <chris.lu@gmail.com>2024-03-27 23:11:35 -0700
commit7463058299c937c8a9d7de96b00b1ff769d194a6 (patch)
tree1642321580509ec5d511343935fecee91ff921c9
parentdab54543329c95361ab1b502c747caece5548c37 (diff)
downloadseaweedfs-7463058299c937c8a9d7de96b00b1ff769d194a6.tar.xz
seaweedfs-7463058299c937c8a9d7de96b00b1ff769d194a6.zip
change visibility
-rw-r--r--weed/mq/topic/local_partition.go28
1 files changed, 14 insertions, 14 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 3e5963855..5df2535a6 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -30,8 +30,8 @@ type LocalPartition struct {
Subscribers *LocalPartitionSubscribers
FollowerId int32
- FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
- FollowerGrpcConnection *grpc.ClientConn
+ followerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
+ followerGrpcConnection *grpc.ClientConn
follower string
}
@@ -59,9 +59,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
// maybe send to the follower
- if p.FollowerStream != nil {
+ if p.followerStream != nil {
println("recv", string(message.Key), message.TsNs)
- if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message,
},
@@ -147,7 +147,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
}
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
- if p.FollowerStream != nil {
+ if p.followerStream != nil {
return nil
}
if len(initMessage.FollowerBrokers) == 0 {
@@ -156,16 +156,16 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
p.follower = initMessage.FollowerBrokers[0]
ctx := context.Background()
- p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
+ p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", p.follower, err)
}
- followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
- p.FollowerStream, err = followerClient.PublishFollowMe(ctx)
+ followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
+ p.followerStream, err = followerClient.PublishFollowMe(ctx)
if err != nil {
return fmt.Errorf("fail to create publish client: %v", err)
}
- if err = p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if err = p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
@@ -183,7 +183,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}()
for {
- ack, err := p.FollowerStream.Recv()
+ ack, err := p.followerStream.Recv()
if err != nil {
glog.Errorf("Error receiving follower ack: %v", err)
return
@@ -204,9 +204,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
}
p.LogBuffer.ShutdownLogBuffer()
- if p.FollowerStream != nil {
+ if p.followerStream != nil {
// send close to the follower
- if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
@@ -214,8 +214,8 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
glog.Errorf("Error closing follower stream: %v", followErr)
}
glog.V(4).Infof("closing grpcConnection to follower")
- p.FollowerGrpcConnection.Close()
- p.FollowerStream = nil
+ p.followerGrpcConnection.Close()
+ p.followerStream = nil
}
return