aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2024-02-05 16:46:40 -0800
committerChris Lu <chris.lu@gmail.com>2024-02-05 16:46:40 -0800
commit4dc6681833c5fd8094803cc2c01cf9664c982fc0 (patch)
treec097959e7da6f4b2cc4d2160fd5b7b6a74a8c7a7
parent0d74ac9224178ed066425f06839f50f4c6a54597 (diff)
downloadseaweedfs-4dc6681833c5fd8094803cc2c01cf9664c982fc0.tar.xz
seaweedfs-4dc6681833c5fd8094803cc2c01cf9664c982fc0.zip
proxy requests to lock owner
-rw-r--r--weed/mq/broker/broker_connect.go27
-rw-r--r--weed/mq/broker/broker_grpc_balance.go7
-rw-r--r--weed/mq/broker/broker_grpc_configure.go52
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go15
-rw-r--r--weed/mq/broker/broker_server.go6
-rw-r--r--weed/mq/client/pub_client/scheduler.go27
6 files changed, 63 insertions, 71 deletions
diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go
index 33040c6a2..602461c82 100644
--- a/weed/mq/broker/broker_connect.go
+++ b/weed/mq/broker/broker_connect.go
@@ -4,9 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"math/rand"
@@ -14,31 +12,16 @@ import (
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
-func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
- // find the lock owner
- var brokerBalancer string
- err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
- Name: pub_balancer.LockBrokerBalancer,
- })
- if err != nil {
- return err
- }
- brokerBalancer = resp.Owner
- return nil
- })
- if err != nil {
- return err
- }
- b.currentBalancer = pb.ServerAddress(brokerBalancer)
+func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) error {
+ self := string(b.option.BrokerAddress())
- glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer)
+ glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer)
if brokerBalancer == "" {
return fmt.Errorf("no balancer found")
}
// connect to the lock owner
- err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ return pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.PublisherToPubBalancer(context.Background())
if err != nil {
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
@@ -75,6 +58,4 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
return nil
})
-
- return err
}
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go
index c09161ff9..4d74cede4 100644
--- a/weed/mq/broker/broker_grpc_balance.go
+++ b/weed/mq/broker/broker_grpc_balance.go
@@ -2,17 +2,18 @@ package broker
import (
"context"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
- if b.currentBalancer == "" {
+ if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
- if !b.lockAsBalancer.IsLocked() {
- proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ if !b.isLockOwner() {
+ proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.BalanceTopics(ctx, request)
return nil
})
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 318e2f6da..d882eeea7 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -15,11 +16,12 @@ import (
// It generates an assignments based on existing allocations,
// and then assign the partitions to the brokers.
func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
- if b.currentBalancer == "" {
+ if !b.lockAsBalancer.IsLocked() {
+ glog.V(0).Infof("broker %s found balancer:%s, %s isLocked:%v", b.option.BrokerAddress(), pb.ServerAddress(b.lockAsBalancer.LockOwner()), b.lockAsBalancer.LockOwner(), b.lockAsBalancer.IsLocked())
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
- if !b.lockAsBalancer.IsLocked() {
- proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ if !b.isLockOwner() {
+ proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ConfigureTopic(ctx, request)
return nil
})
@@ -30,38 +32,42 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
t := topic.FromPbTopic(request.Topic)
- var readErr error
+ var readErr, assignErr error
resp, readErr = b.readTopicConfFromFiler(t)
if readErr != nil {
- glog.V(0).Infof("read topic %s conf: %v", request.Topic, err)
- } else {
- readErr = b.ensureTopicActiveAssignments(t, resp)
+ glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr)
+ }
+
+ if resp != nil {
+ assignErr = b.ensureTopicActiveAssignments(t, resp)
// no need to assign directly.
// The added or updated assignees will read from filer directly.
// The gone assignees will die by themselves.
}
- if readErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
+
+ if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
- } else {
- if resp!=nil && len(resp.BrokerPartitionAssignments) > 0 {
- if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
- glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
- }
- }
- resp = &mq_pb.ConfigureTopicResponse{}
- if b.Balancer.Brokers.IsEmpty() {
- return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
- }
- resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
+ return
+ }
- // save the topic configuration on filer
- if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {
- return nil, fmt.Errorf("configure topic: %v", err)
+ if resp != nil && len(resp.BrokerPartitionAssignments) > 0 {
+ if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
+ glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
}
+ }
+ resp = &mq_pb.ConfigureTopicResponse{}
+ if b.Balancer.Brokers.IsEmpty() {
+ return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
+ }
+ resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.Balancer.Brokers, request.PartitionCount)
- b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
+ // save the topic configuration on filer
+ if err := b.saveTopicConfToFiler(request.Topic, resp); err != nil {
+ return nil, fmt.Errorf("configure topic: %v", err)
}
+ b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
+
glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
return resp, err
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 966345d94..7881090c9 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -12,11 +13,11 @@ import (
// LookupTopicBrokers returns the brokers that are serving the topic
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
- if b.currentBalancer == "" {
+ if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
if !b.lockAsBalancer.IsLocked() {
- proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request)
return nil
})
@@ -41,11 +42,11 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
}
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
- if b.currentBalancer == "" {
+ if !b.lockAsBalancer.IsLocked() {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
- if !b.lockAsBalancer.IsLocked() {
- proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ if !b.isLockOwner() {
+ proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request)
return nil
})
@@ -76,3 +77,7 @@ func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.List
return ret, nil
}
+
+func (b *MessageQueueBroker) isLockOwner() bool {
+ return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
+}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 15a3fea16..773ee19cb 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -46,7 +46,6 @@ type MessageQueueBroker struct {
localTopicManager *topic.LocalTopicManager
Balancer *pub_balancer.Balancer
lockAsBalancer *cluster.LiveLock
- currentBalancer pb.ServerAddress
Coordinator *sub_coordinator.Coordinator
accessLock sync.Mutex
}
@@ -87,9 +86,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
- balancer := mqBroker.lockAsBalancer.LockOwner()
- if err := mqBroker.BrokerConnectToBalancer(balancer); err != nil {
- glog.V(0).Infof("BrokerConnectToBalancer %s: %v", balancer, err)
+ if err := mqBroker.BrokerConnectToBalancer(newLockOwner); err != nil {
+ glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
}
})
for {
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index 89d131580..d577e77e0 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -17,20 +17,21 @@ import (
type EachPartitionError struct {
*mq_pb.BrokerPartitionAssignment
- Err error
+ Err error
generation int
}
type EachPartitionPublishJob struct {
*mq_pb.BrokerPartitionAssignment
- stopChan chan bool
- wg sync.WaitGroup
+ stopChan chan bool
+ wg sync.WaitGroup
generation int
inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
}
+
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
- if err := p.doEnsureConfigureTopic(); err != nil {
+ if err := p.doConfigureTopic(); err != nil {
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
}
@@ -101,9 +102,9 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
// start a go routine to publish to this partition
job := &EachPartitionPublishJob{
BrokerPartitionAssignment: assignment,
- stopChan: make(chan bool, 1),
- generation: generation,
- inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
+ stopChan: make(chan bool, 1),
+ generation: generation,
+ inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
}
job.wg.Add(1)
go func(job *EachPartitionPublishJob) {
@@ -135,13 +136,13 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}
publishClient := &PublishClient{
SeaweedMessaging_PublishMessageClient: stream,
- Broker: job.LeaderBroker,
+ Broker: job.LeaderBroker,
}
if err = publishClient.Send(&mq_pb.PublishMessageRequest{
Message: &mq_pb.PublishMessageRequest_Init{
Init: &mq_pb.PublishMessageRequest_InitMessage{
- Topic: p.config.Topic.ToPbTopic(),
- Partition: job.Partition,
+ Topic: p.config.Topic.ToPbTopic(),
+ Partition: job.Partition,
AckInterval: 128,
},
},
@@ -202,7 +203,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return nil
}
-func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
+func (p *TopicPublisher) doConfigureTopic() (err error) {
if len(p.config.Brokers) == 0 {
return fmt.Errorf("no bootstrap brokers")
}
@@ -213,7 +214,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
- Topic: p.config.Topic.ToPbTopic(),
+ Topic: p.config.Topic.ToPbTopic(),
PartitionCount: p.config.CreateTopicPartitionCount,
})
return err
@@ -226,7 +227,7 @@ func (p *TopicPublisher) doEnsureConfigureTopic() (err error) {
}
if lastErr != nil {
- return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
+ return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
}
return nil
}