aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-09 08:01:01 -0800
committerchrislu <chris.lu@gmail.com>2024-01-09 08:01:01 -0800
commitaed54eda6105649bc5e2fdbea2c06a00f7925069 (patch)
treecdc073da651d68dc126bdc03cebf44e0e6a50277 /weed
parentd51efddf5c18c6743cb0129e6c125617be3ceced (diff)
downloadseaweedfs-aed54eda6105649bc5e2fdbea2c06a00f7925069.tar.xz
seaweedfs-aed54eda6105649bc5e2fdbea2c06a00f7925069.zip
refactor
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_grpc_configure.go5
1 files changed, 2 insertions, 3 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 042621a4c..660a82f83 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -78,8 +78,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
- topicPartition := topic.FromPbPartition(assignment.Partition)
- localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, topicPartition))
+ localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition))
if request.IsDraining {
// TODO drain existing topic partition subscriptions
@@ -111,7 +110,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
return ret, nil
}
-func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition topic.Partition) log_buffer.LogFlushFuncType {
+func (b *MessageQueueBroker) genLogFlushFunc(t *mq_pb.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%4d-%4d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)