diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 135 |
1 files changed, 73 insertions, 62 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 3b68db1af..a217489de 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -5,13 +5,13 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" "io" "math/rand" "net" "sync/atomic" + "time" ) // PUB @@ -40,73 +40,86 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis // 2. find the topic metadata owning filer // 3. write to the filer - var localTopicPartition *topic.LocalPartition req, err := stream.Recv() if err != nil { return err } response := &mq_pb.PublishMessageResponse{} // TODO check whether current broker should be the leader for the topic partition - ackInterval := 1 initMessage := req.GetInit() - var t topic.Topic - var p topic.Partition - if initMessage != nil { - t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p) - if err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - return stream.Send(response) - } - ackInterval = int(initMessage.AckInterval) - for _, follower := range initMessage.FollowerBrokers { - followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ - Topic: initMessage.Topic, - Partition: initMessage.Partition, - BrokerSelf: string(b.option.BrokerAddress()), - }) - return err - }) - if followErr != nil { - response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr) - glog.Errorf("follower %v failed: %v", follower, followErr) - return stream.Send(response) - } - } - stream.Send(response) - } else { + if initMessage == nil { response.Error = fmt.Sprintf("missing init message") glog.Errorf("missing init message") return stream.Send(response) } + // get or generate a local partition + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) + if getOrGenErr != nil { + response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) + glog.Errorf("topic %v not found: %v", t, getOrGenErr) + return stream.Send(response) + } + + // connect to follower brokers + if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { + response.Error = followerErr.Error() + glog.Errorf("MaybeConnectToFollowers: %v", followerErr) + return stream.Send(response) + } + + var receivedSequence, acknowledgedSequence int64 + var isClosed bool + + // start sending ack to publisher + ackInterval := int64(1) + if initMessage.AckInterval > 0 { + ackInterval = int64(initMessage.AckInterval) + } + go func() { + defer func() { + // println("stop sending ack to publisher", initMessage.PublisherName) + }() + + lastAckTime := time.Now() + for !isClosed { + receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs) + if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){ + acknowledgedSequence = receivedSequence + response := &mq_pb.PublishMessageResponse{ + AckSequence: acknowledgedSequence, + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending response %v: %v", response, err) + } + // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) + lastAckTime = time.Now() + } else { + time.Sleep(1 * time.Second) + } + } + }() + + + // process each published messages clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition) localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher()) - ackCounter := 0 - var ackSequence int64 - var isStopping int32 - respChan := make(chan *mq_pb.PublishMessageResponse, 128) defer func() { - atomic.StoreInt32(&isStopping, 1) - respChan <- &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - } - close(respChan) + // remove the publisher localTopicPartition.Publishers.RemovePublisher(clientName) - glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size()) if localTopicPartition.MaybeShutdownLocalPartition() { - b.localTopicManager.RemoveTopicPartition(t, p) + b.localTopicManager.RemoveLocalPartition(t, p) + glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } }() - go func() { - for resp := range respChan { - if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending response %v: %v", resp, err) - } - } + + // send a hello message + stream.Send(&mq_pb.PublishMessageResponse{}) + + defer func() { + isClosed = true }() // process each published messages @@ -117,28 +130,26 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) - return err + glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + break } // Process the received message - if dataMessage := req.GetData(); dataMessage != nil { - localTopicPartition.Publish(dataMessage) + dataMessage := req.GetData() + if dataMessage == nil { + continue } - ackCounter++ - ackSequence++ - if ackCounter >= ackInterval { - ackCounter = 0 - // send back the ack - response := &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - } - respChan <- response + // The control message should still be sent to the follower + // to avoid timing issue when ack messages. + + // send to the local partition + if err = localTopicPartition.Publish(dataMessage); err != nil { + return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err) } } - glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition) + glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } |
