aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_pub.go57
1 files changed, 28 insertions, 29 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 6cb907794..68bcc49b2 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -91,38 +91,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}); err != nil {
return err
}
- }
-
- // process each published messages
- clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
- localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
-
- ackCounter := 0
- var ackSequence int64
- defer func() {
- if localTopicPartition.FollowerStream == nil {
- // remove the publisher
- localTopicPartition.Publishers.RemovePublisher(clientName)
- glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
- if localTopicPartition.MaybeShutdownLocalPartition() {
- b.localTopicManager.RemoveTopicPartition(t, p)
- }
- }
- }()
- if localTopicPartition.FollowerStream != nil {
+ // start receiving ack from follower
go func() {
defer func() {
println("stop receiving ack from follower")
-
- // remove the publisher
- localTopicPartition.Publishers.RemovePublisher(clientName)
- glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
- if localTopicPartition.MaybeShutdownLocalPartition() {
- b.localTopicManager.RemoveTopicPartition(t, p)
- }
- println("closing grpcConnection to follower")
- localTopicPartition.GrpcConnection.Close()
}()
for {
@@ -131,7 +104,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
glog.Errorf("Error receiving response: %v", err)
return
}
- ackSequence = ack.AckTsNs
println("recv ack", ack.AckTsNs)
if err := stream.Send(&mq_pb.PublishMessageResponse{
AckSequence: ack.AckTsNs,
@@ -143,6 +115,33 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}()
}
+ // process each published messages
+ clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
+ localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
+
+ ackCounter := 0
+ var ackSequence int64
+ defer func() {
+ // remove the publisher
+ localTopicPartition.Publishers.RemovePublisher(clientName)
+ glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
+ if localTopicPartition.MaybeShutdownLocalPartition() {
+ if localTopicPartition.FollowerStream != nil {
+ // send close to the follower
+ if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ glog.Errorf("Error closing follower stream: %v", followErr)
+ }
+ println("closing grpcConnection to follower")
+ localTopicPartition.GrpcConnection.Close()
+ }
+ b.localTopicManager.RemoveTopicPartition(t, p)
+ }
+ }()
+
// send a hello message
stream.Send(&mq_pb.PublishMessageResponse{})