aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_configure.go18
-rw-r--r--weed/mq/broker/broker_server.go2
-rw-r--r--weed/mq/topic/local_partition.go4
3 files changed, 14 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 008c08bbe..8d3727b1f 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -82,18 +82,20 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
- localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
+ t := topic.FromPbTopic(request.Topic)
+ partition := topic.FromPbPartition(assignment.Partition)
+ b.accessLock.Lock()
if request.IsDraining {
// TODO drain existing topic partition subscriptions
-
- b.localTopicManager.RemoveTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartition.Partition)
+ b.localTopicManager.RemoveTopicPartition(t, partition)
} else {
- b.localTopicManager.AddTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartition)
+ var localPartition *topic.LocalPartition
+ if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
+ localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
+ b.localTopicManager.AddTopicPartition(t, localPartition)
+ }
}
+ b.accessLock.Unlock()
}
// if is leader, notify the followers to drain existing topic partition subscriptions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 615964621..1a2c09ca4 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -43,6 +44,7 @@ type MessageQueueBroker struct {
lockAsBalancer *cluster.LiveLock
currentBalancer pb.ServerAddress
Coordinator *sub_coordinator.Coordinator
+ accessLock sync.Mutex
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 84602add7..f4a080f38 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -84,13 +84,13 @@ func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.Message
return p.logBuffer.GetEarliestPosition()
}
-func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
+func FromPbBrokerPartitionAssignment(self pb.ServerAddress, partition Partition, assignment *mq_pb.BrokerPartitionAssignment, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
isLeader := assignment.LeaderBroker == string(self)
followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
for i, followerBroker := range assignment.FollowerBrokers {
followers[i] = pb.ServerAddress(followerBroker)
}
- return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers, logFlushFn, readFromDiskFn)
+ return NewLocalPartition(partition, isLeader, followers, logFlushFn, readFromDiskFn)
}
func (p *LocalPartition) closePublishers() {