aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_assign.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
-rw-r--r--weed/mq/broker/broker_grpc_assign.go3
1 files changed, 2 insertions, 1 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 48ec0d5bd..9a9b34c0b 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -26,7 +27,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else {
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
b.localTopicManager.AddLocalPartition(t, localPartition)
}
}