aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_assign.go10
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go2
-rw-r--r--weed/mq/topic/local_partition.go30
3 files changed, 26 insertions, 16 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 48ec0d5bd..99fe88acd 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -15,9 +15,15 @@ import (
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
ret := &mq_pb.AssignTopicPartitionsResponse{}
+ t := topic.FromPbTopic(request.Topic)
+ conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
+ if readConfErr != nil {
+ glog.Errorf("topic %v not found: %v", t, readConfErr)
+ return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
+ }
+
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
- t := topic.FromPbTopic(request.Topic)
partition := topic.FromPbPartition(assignment.Partition)
b.accessLock.Lock()
if request.IsDraining {
@@ -26,7 +32,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else {
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType)
b.localTopicManager.AddLocalPartition(t, localPartition)
}
}
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index ea5cb71b9..476ad2533 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -40,7 +40,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
self := b.option.BrokerAddress()
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType)
b.localTopicManager.AddLocalPartition(t, localPartition)
isGenerated = true
break
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 8911c1841..304f019f2 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -28,18 +29,21 @@ type LocalPartition struct {
Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers
- publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
+ RecordType *schema_pb.RecordType
+
+ publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
followerGrpcConnection *grpc.ClientConn
Follower string
}
var TIME_FORMAT = "2006-01-02-15-04-05"
-func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
+func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType, recordType *schema_pb.RecordType) *LocalPartition {
lp := &LocalPartition{
Partition: partition,
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
+ RecordType: recordType,
}
lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
@@ -55,9 +59,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
p.LogBuffer.AddToBuffer(message)
// maybe send to the follower
- if p.publishFolloweMeStream != nil {
+ if p.publishFollowMeStream != nil {
// println("recv", string(message.Key), message.TsNs)
- if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
+ if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message,
},
@@ -134,7 +138,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
}
func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
- if p.publishFolloweMeStream != nil {
+ if p.publishFollowMeStream != nil {
return nil
}
if initMessage.FollowerBroker == "" {
@@ -148,11 +152,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
return fmt.Errorf("fail to dial %s: %v", p.Follower, err)
}
followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
- p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx)
+ p.publishFollowMeStream, err = followerClient.PublishFollowMe(ctx)
if err != nil {
return fmt.Errorf("fail to create publish client: %v", err)
}
- if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
+ if err = p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Init{
Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
@@ -170,7 +174,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
}()
for {
- ack, err := p.publishFolloweMeStream.Recv()
+ ack, err := p.publishFollowMeStream.Recv()
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Canceled {
@@ -194,9 +198,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
for !p.LogBuffer.IsAllFlushed() {
time.Sleep(113 * time.Millisecond)
}
- if p.publishFolloweMeStream != nil {
+ if p.publishFollowMeStream != nil {
// send close to the follower
- if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
+ if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Close{
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
@@ -205,7 +209,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
}
glog.V(4).Infof("closing grpcConnection to follower")
p.followerGrpcConnection.Close()
- p.publishFolloweMeStream = nil
+ p.publishFollowMeStream = nil
p.Follower = ""
}
@@ -224,8 +228,8 @@ func (p *LocalPartition) Shutdown() {
}
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
- if p.publishFolloweMeStream != nil {
- if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
+ if p.publishFollowMeStream != nil {
+ if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Flush{
Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
TsNs: flushTsNs,