diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_assign.go | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 991208a72..3f502cb3c 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -3,6 +3,8 @@ package broker import ( "context" "fmt" + "sync" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" @@ -10,7 +12,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" - "sync" ) // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment @@ -28,8 +29,13 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } else { var localPartition *topic.LocalPartition if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) + localPartition = topic.NewLocalPartition(partition, b.option.LogFlushInterval, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition)) + + // Initialize offset from existing data to ensure continuity on restart + b.initializePartitionOffsetFromExistingData(localPartition, t, partition) + b.localTopicManager.AddLocalPartition(t, localPartition) + } else { } } b.accessLock.Unlock() @@ -50,7 +56,6 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } } - glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } |
