aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_pub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go135
1 files changed, 73 insertions, 62 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 3b68db1af..a217489de 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -5,13 +5,13 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
"io"
"math/rand"
"net"
"sync/atomic"
+ "time"
)
// PUB
@@ -40,73 +40,86 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// 2. find the topic metadata owning filer
// 3. write to the filer
- var localTopicPartition *topic.LocalPartition
req, err := stream.Recv()
if err != nil {
return err
}
response := &mq_pb.PublishMessageResponse{}
// TODO check whether current broker should be the leader for the topic partition
- ackInterval := 1
initMessage := req.GetInit()
- var t topic.Topic
- var p topic.Partition
- if initMessage != nil {
- t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p)
- if err != nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
- }
- ackInterval = int(initMessage.AckInterval)
- for _, follower := range initMessage.FollowerBrokers {
- followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
- Topic: initMessage.Topic,
- Partition: initMessage.Partition,
- BrokerSelf: string(b.option.BrokerAddress()),
- })
- return err
- })
- if followErr != nil {
- response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr)
- glog.Errorf("follower %v failed: %v", follower, followErr)
- return stream.Send(response)
- }
- }
- stream.Send(response)
- } else {
+ if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
glog.Errorf("missing init message")
return stream.Send(response)
}
+ // get or generate a local partition
+ t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+ localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
+ if getOrGenErr != nil {
+ response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
+ glog.Errorf("topic %v not found: %v", t, getOrGenErr)
+ return stream.Send(response)
+ }
+
+ // connect to follower brokers
+ if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
+ response.Error = followerErr.Error()
+ glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
+ return stream.Send(response)
+ }
+
+ var receivedSequence, acknowledgedSequence int64
+ var isClosed bool
+
+ // start sending ack to publisher
+ ackInterval := int64(1)
+ if initMessage.AckInterval > 0 {
+ ackInterval = int64(initMessage.AckInterval)
+ }
+ go func() {
+ defer func() {
+ // println("stop sending ack to publisher", initMessage.PublisherName)
+ }()
+
+ lastAckTime := time.Now()
+ for !isClosed {
+ receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
+ if acknowledgedSequence < receivedSequence && (receivedSequence - acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second){
+ acknowledgedSequence = receivedSequence
+ response := &mq_pb.PublishMessageResponse{
+ AckSequence: acknowledgedSequence,
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending response %v: %v", response, err)
+ }
+ // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
+ lastAckTime = time.Now()
+ } else {
+ time.Sleep(1 * time.Second)
+ }
+ }
+ }()
+
+
+ // process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
- ackCounter := 0
- var ackSequence int64
- var isStopping int32
- respChan := make(chan *mq_pb.PublishMessageResponse, 128)
defer func() {
- atomic.StoreInt32(&isStopping, 1)
- respChan <- &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- }
- close(respChan)
+ // remove the publisher
localTopicPartition.Publishers.RemovePublisher(clientName)
- glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
if localTopicPartition.MaybeShutdownLocalPartition() {
- b.localTopicManager.RemoveTopicPartition(t, p)
+ b.localTopicManager.RemoveLocalPartition(t, p)
+ glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
}()
- go func() {
- for resp := range respChan {
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending response %v: %v", resp, err)
- }
- }
+
+ // send a hello message
+ stream.Send(&mq_pb.PublishMessageResponse{})
+
+ defer func() {
+ isClosed = true
}()
// process each published messages
@@ -117,28 +130,26 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err == io.EOF {
break
}
- glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
- return err
+ glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
+ break
}
// Process the received message
- if dataMessage := req.GetData(); dataMessage != nil {
- localTopicPartition.Publish(dataMessage)
+ dataMessage := req.GetData()
+ if dataMessage == nil {
+ continue
}
- ackCounter++
- ackSequence++
- if ackCounter >= ackInterval {
- ackCounter = 0
- // send back the ack
- response := &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- }
- respChan <- response
+ // The control message should still be sent to the follower
+ // to avoid timing issue when ack messages.
+
+ // send to the local partition
+ if err = localTopicPartition.Publish(dataMessage); err != nil {
+ return fmt.Errorf("topic %v partition %v publish error: %v", initMessage.Topic, initMessage.Partition, err)
}
}
- glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
+ glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
return nil
}