aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2024-02-05 18:00:50 -0800
committerChris Lu <chris.lu@gmail.com>2024-02-05 18:00:50 -0800
commit656b78d1af4024b0c172229034d01f7a1fe8d841 (patch)
tree7497ffec0abd3bfa502953291ca8f7551fa05462
parent90fcde0e26cb4dc3c64359451e36b7b26da87169 (diff)
downloadseaweedfs-656b78d1af4024b0c172229034d01f7a1fe8d841.tar.xz
seaweedfs-656b78d1af4024b0c172229034d01f7a1fe8d841.zip
proxy to broker leader
-rw-r--r--weed/cluster/lock_client.go4
-rw-r--r--weed/mq/broker/broker_grpc_balance.go5
-rw-r--r--weed/mq/broker/broker_grpc_configure.go4
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go10
-rw-r--r--weed/mq/broker/broker_grpc_pub_balancer.go4
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go4
6 files changed, 5 insertions, 26 deletions
diff --git a/weed/cluster/lock_client.go b/weed/cluster/lock_client.go
index 57801735c..4ebb067df 100644
--- a/weed/cluster/lock_client.go
+++ b/weed/cluster/lock_client.go
@@ -122,10 +122,6 @@ func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
return nil
}
-func (lock *LiveLock) IsLocked() bool {
- return lock != nil && lock.isLocked
-}
-
func (lock *LiveLock) StopShortLivedLock() error {
if !lock.isLocked {
return nil
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go
index 4d74cede4..412407211 100644
--- a/weed/mq/broker/broker_grpc_balance.go
+++ b/weed/mq/broker/broker_grpc_balance.go
@@ -4,14 +4,9 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
- if !b.lockAsBalancer.IsLocked() {
- return nil, status.Errorf(codes.Unavailable, "no balancer")
- }
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.BalanceTopics(ctx, request)
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index d882eeea7..6a6e92922 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -16,10 +16,6 @@ import (
// It generates an assignments based on existing allocations,
// and then assign the partitions to the brokers.
func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
- if !b.lockAsBalancer.IsLocked() {
- glog.V(0).Infof("broker %s found balancer:%s, %s isLocked:%v", b.option.BrokerAddress(), pb.ServerAddress(b.lockAsBalancer.LockOwner()), b.lockAsBalancer.LockOwner(), b.lockAsBalancer.IsLocked())
- return nil, status.Errorf(codes.Unavailable, "no balancer")
- }
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ConfigureTopic(ctx, request)
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 7881090c9..14c1f37da 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -7,16 +7,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
// LookupTopicBrokers returns the brokers that are serving the topic
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
- if !b.lockAsBalancer.IsLocked() {
- return nil, status.Errorf(codes.Unavailable, "no balancer")
- }
- if !b.lockAsBalancer.IsLocked() {
+ if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request)
return nil
@@ -42,9 +37,6 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
}
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
- if !b.lockAsBalancer.IsLocked() {
- return nil, status.Errorf(codes.Unavailable, "no balancer")
- }
if !b.isLockOwner() {
proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request)
diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go
index 4edceb8a0..2cf90b4bc 100644
--- a/weed/mq/broker/broker_grpc_pub_balancer.go
+++ b/weed/mq/broker/broker_grpc_pub_balancer.go
@@ -9,7 +9,7 @@ import (
// PublisherToPubBalancer receives connections from brokers and collects stats
func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error {
- if !b.lockAsBalancer.IsLocked() {
+ if !b.isLockOwner() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
req, err := stream.Recv()
@@ -35,7 +35,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
if err != nil {
return err
}
- if !b.lockAsBalancer.IsLocked() {
+ if !b.isLockOwner() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
if receivedStats := req.GetStats(); receivedStats != nil {
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 94bd6b0e2..89c221af5 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -12,7 +12,7 @@ import (
// SubscriberToSubCoordinator coordinates the subscribers
func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error {
- if !b.lockAsBalancer.IsLocked() {
+ if !b.isLockOwner() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
req, err := stream.Recv()
@@ -43,7 +43,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
for i, assignment := range conf.BrokerPartitionAssignments {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
Partition: assignment.Partition,
- Broker: assignment.LeaderBroker,
+ Broker: assignment.LeaderBroker,
}
}
// send partition assignment to subscriber