aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_pub.go17
-rw-r--r--weed/mq/topic/local_partition.go25
2 files changed, 23 insertions, 19 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 6e8d05a33..86e689b5a 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -143,21 +143,8 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
// send to the local partition
- localTopicPartition.Publish(dataMessage)
- receivedSequence = dataMessage.TsNs
-
- // maybe send to the follower
- if localTopicPartition.FollowerStream != nil {
- println("recv", string(dataMessage.Key), dataMessage.TsNs)
- if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
- Message: &mq_pb.PublishFollowMeRequest_Data{
- Data: dataMessage,
- },
- }); followErr != nil {
- return followErr
- }
- } else {
- atomic.StoreInt64(&localTopicPartition.AckTsNs, receivedSequence)
+ if err = localTopicPartition.Publish(dataMessage); err != nil {
+ return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
}
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 9bedc5a15..3e5963855 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -32,6 +32,7 @@ type LocalPartition struct {
FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
FollowerGrpcConnection *grpc.ClientConn
+ follower string
}
var TIME_FORMAT = "2006-01-02-15-04-05"
@@ -54,8 +55,24 @@ func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.
return lp
}
-func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {
+func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
+
+ // maybe send to the follower
+ if p.FollowerStream != nil {
+ println("recv", string(message.Key), message.TsNs)
+ if followErr := p.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Data{
+ Data: message,
+ },
+ }); followErr != nil {
+ return fmt.Errorf("send to follower %s: %v", p.follower, followErr)
+ }
+ } else {
+ atomic.StoreInt64(&p.AckTsNs, message.TsNs)
+ }
+
+ return nil
}
func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
@@ -137,11 +154,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
return nil
}
- follower := initMessage.FollowerBrokers[0]
+ p.follower = initMessage.FollowerBrokers[0]
ctx := context.Background()
- p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, follower, true, grpcDialOption)
+ p.FollowerGrpcConnection, err = pb.GrpcDial(ctx, p.follower, true, grpcDialOption)
if err != nil {
- return fmt.Errorf("fail to dial %s: %v", follower, err)
+ return fmt.Errorf("fail to dial %s: %v", p.follower, err)
}
followerClient := mq_pb.NewSeaweedMessagingClient(p.FollowerGrpcConnection)
p.FollowerStream, err = followerClient.PublishFollowMe(ctx)