aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_assign.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
-rw-r--r--weed/mq/broker/broker_grpc_assign.go11
1 files changed, 8 insertions, 3 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 991208a72..3f502cb3c 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -3,6 +3,8 @@ package broker
import (
"context"
"fmt"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
@@ -10,7 +12,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "sync"
)
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
@@ -28,8 +29,13 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else {
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
+ localPartition = topic.NewLocalPartition(partition, b.option.LogFlushInterval, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
+
+ // Initialize offset from existing data to ensure continuity on restart
+ b.initializePartitionOffsetFromExistingData(localPartition, t, partition)
+
b.localTopicManager.AddLocalPartition(t, localPartition)
+ } else {
}
}
b.accessLock.Unlock()
@@ -50,7 +56,6 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
}
}
- glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil
}