diff options
| -rw-r--r-- | weed/mq/broker/broker_grpc_assign.go | 10 | ||||
| -rw-r--r-- | weed/mq/broker/broker_topic_conf_read_write.go | 2 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 30 |
3 files changed, 26 insertions, 16 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 48ec0d5bd..99fe88acd 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -15,9 +15,15 @@ import ( func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { ret := &mq_pb.AssignTopicPartitionsResponse{} + t := topic.FromPbTopic(request.Topic) + conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { - t := topic.FromPbTopic(request.Topic) partition := topic.FromPbPartition(assignment.Partition) b.accessLock.Lock() if request.IsDraining { @@ -26,7 +32,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } else { var localPartition *topic.LocalPartition if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType) b.localTopicManager.AddLocalPartition(t, localPartition) } } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index ea5cb71b9..476ad2533 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -40,7 +40,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition self := b.option.BrokerAddress() for _, assignment := range conf.BrokerPartitionAssignments { if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType) b.localTopicManager.AddLocalPartition(t, localPartition) isGenerated = true break diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index 8911c1841..304f019f2 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -28,18 +29,21 @@ type LocalPartition struct { Publishers *LocalPartitionPublishers Subscribers *LocalPartitionSubscribers - publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient + RecordType *schema_pb.RecordType + + publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient followerGrpcConnection *grpc.ClientConn Follower string } var TIME_FORMAT = "2006-01-02-15-04-05" -func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition { +func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType, recordType *schema_pb.RecordType) *LocalPartition { lp := &LocalPartition{ Partition: partition, Publishers: NewLocalPartitionPublishers(), Subscribers: NewLocalPartitionSubscribers(), + RecordType: recordType, } lp.ListenersCond = sync.NewCond(&lp.ListenersLock) lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop), @@ -55,9 +59,9 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error { p.LogBuffer.AddToBuffer(message) // maybe send to the follower - if p.publishFolloweMeStream != nil { + if p.publishFollowMeStream != nil { // println("recv", string(message.Key), message.TsNs) - if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ + if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Data{ Data: message, }, @@ -134,7 +138,7 @@ func (p *LocalPartition) WaitUntilNoPublishers() { } func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) { - if p.publishFolloweMeStream != nil { + if p.publishFollowMeStream != nil { return nil } if initMessage.FollowerBroker == "" { @@ -148,11 +152,11 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa return fmt.Errorf("fail to dial %s: %v", p.Follower, err) } followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection) - p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx) + p.publishFollowMeStream, err = followerClient.PublishFollowMe(ctx) if err != nil { return fmt.Errorf("fail to create publish client: %v", err) } - if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ + if err = p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Init{ Init: &mq_pb.PublishFollowMeRequest_InitMessage{ Topic: initMessage.Topic, @@ -170,7 +174,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa }() for { - ack, err := p.publishFolloweMeStream.Recv() + ack, err := p.publishFollowMeStream.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Canceled { @@ -194,9 +198,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { for !p.LogBuffer.IsAllFlushed() { time.Sleep(113 * time.Millisecond) } - if p.publishFolloweMeStream != nil { + if p.publishFollowMeStream != nil { // send close to the follower - if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ + if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Close{ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, }, @@ -205,7 +209,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { } glog.V(4).Infof("closing grpcConnection to follower") p.followerGrpcConnection.Close() - p.publishFolloweMeStream = nil + p.publishFollowMeStream = nil p.Follower = "" } @@ -224,8 +228,8 @@ func (p *LocalPartition) Shutdown() { } func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { - if p.publishFolloweMeStream != nil { - if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{ + if p.publishFollowMeStream != nil { + if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{ Message: &mq_pb.PublishFollowMeRequest_Flush{ Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{ TsNs: flushTsNs, |
