aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_pub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go134
1 files changed, 49 insertions, 85 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index acbffefba..43280e9be 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -5,71 +5,36 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/peer"
+ "math/rand"
+ "net"
"sync/atomic"
"time"
)
-// For a new or re-configured topic, or one of the broker went offline,
-// the pub clients ask one broker what are the brokers for all the topic partitions.
-// The broker will lock the topic on write.
-// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
-// 2. if the topic is found, return the brokers for the topic partitions
-// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
-// The broker will lock the topic on read.
-// 1. if the topic is not found, return error
-// 2. if the topic is found, return the brokers for the topic partitions
-//
-// If the topic needs to be re-balanced, the admin client will lock the topic,
-// 1. collect throughput information for all the brokers
-// 2. adjust the topic partitions to the brokers
-// 3. notify the brokers to add/remove partitions to host
-// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
-// 4. the brokers will stop process incoming messages if not the right partition
-// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
-// 4.2 the sub clients will need to change the brokers to read from
-//
-// The following is from each individual component's perspective:
-// For a pub client
-// For current topic/partition, ask one broker for the brokers for the topic partitions
-// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
-// For a sub client
-// For current topic/partition, ask one broker for the brokers for the topic partitions
-// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
-// For a broker
-// Upon a pub client lookup:
-// 1. lock the topic
-// 2. if already has topic partition assignment, check all brokers are healthy
-// 3. if not, create topic partition assignment
-// 2. return the brokers for the topic partitions
-// 3. unlock the topic
-// Upon a sub client lookup:
-// 1. lock the topic
-// 2. if already has topic partition assignment, check all brokers are healthy
-// 3. if not, return error
-// 2. return the brokers for the topic partitions
-// 3. unlock the topic
-// For an admin tool
-// 0. collect stats from all the brokers, and find the topic worth moving
-// 1. lock the topic
-// 2. collect throughput information for all the brokers
-// 3. adjust the topic partitions to the brokers
-// 4. notify the brokers to add/remove partitions to host
-// 5. the brokers will stop process incoming messages if not the right partition
-// 6. unlock the topic
+// PUB
+// 1. gRPC API to configure a topic
+// 1.1 create a topic with existing partition count
+// 1.2 assign partitions to brokers
+// 2. gRPC API to lookup topic partitions
+// 3. gRPC API to publish by topic partitions
-/*
-The messages are buffered in memory, and saved to filer under
- /topics/<topic>/<date>/<hour>/<segment>/*.msg
- /topics/<topic>/<date>/<hour>/segment
- /topics/<topic>/info/segment_<id>.meta
+// SUB
+// 1. gRPC API to lookup a topic partitions
+// Re-balance topic partitions for publishing
+// 1. collect stats from all the brokers
+// 2. Rebalance and configure new generation of partitions on brokers
+// 3. Tell brokers to close current gneration of publishing.
+// Publishers needs to lookup again and publish to the new generation of partitions.
+// Re-balance topic partitions for subscribing
+// 1. collect stats from all the brokers
+// Subscribers needs to listen for new partitions and connect to the brokers.
+// Each subscription may not get data. It can act as a backup.
-*/
-
-func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
// 1. write to the volume server
// 2. find the topic metadata owning filer
// 3. write to the filer
@@ -85,19 +50,23 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
initMessage := req.GetInit()
if initMessage != nil {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
- localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
- broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
+ response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
} else {
- response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
+ response.Error = fmt.Sprintf("missing init message")
+ glog.Errorf("missing init message")
return stream.Send(response)
}
+ clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
+ localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
+
ackCounter := 0
var ackSequence int64
var isStopping int32
@@ -105,6 +74,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
defer func() {
atomic.StoreInt32(&isStopping, 1)
close(respChan)
+ localTopicPartition.Publishers.RemovePublisher(clientName)
}()
go func() {
ticker := time.NewTicker(1 * time.Second)
@@ -127,6 +97,11 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
} else {
return
}
+ case <-localTopicPartition.StopPublishersCh:
+ respChan <- &mq_pb.PublishResponse{
+ AckSequence: ackSequence,
+ ShouldClose: true,
+ }
}
}
}()
@@ -156,33 +131,22 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
}
}
- glog.Infof("publish stream closed")
+ glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
return nil
}
-// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
-func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
- ret := &mq_pb.AssignTopicPartitionsResponse{}
- self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
-
- for _, brokerPartition := range request.BrokerPartitionAssignments {
- localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
- broker.localTopicManager.AddTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartiton)
- if request.IsLeader {
- for _, follower := range localPartiton.FollowerBrokers {
- err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.AssignTopicPartitions(context.Background(), request)
- return err
- })
- if err != nil {
- return ret, err
- }
- }
- }
+// duplicated from master_grpc_server.go
+func findClientAddress(ctx context.Context) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
}
-
- return ret, nil
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ return pr.Addr.String()
}