diff options
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go new file mode 100644 index 000000000..a766814e6 --- /dev/null +++ b/weed/mq/client/pub_client/scheduler.go @@ -0,0 +1,272 @@ +package pub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "log" + "sort" + "sync" + "time" +) + +type EachPartitionError struct { + *mq_pb.BrokerPartitionAssignment + Err error + generation int +} + +type EachPartitionPublishJob struct { + *mq_pb.BrokerPartitionAssignment + stopChan chan bool + wg sync.WaitGroup + generation int + inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] +} + +func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { + + if err := p.doConfigureTopic(); err != nil { + return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) + } + + log.Printf("start scheduler thread for topic %s", p.config.Topic) + + generation := 0 + var errChan chan EachPartitionError + for { + glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) + if assignments, err := p.doLookupTopicPartitions(); err == nil { + generation++ + glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) + if errChan == nil { + errChan = make(chan EachPartitionError, len(assignments)) + } + p.onEachAssignments(generation, assignments, errChan) + } else { + glog.Errorf("lookup topic %s: %v", p.config.Topic, err) + time.Sleep(5 * time.Second) + continue + } + + if generation == 1 { + wg.Done() + } + + // wait for any error to happen. If so, consume all remaining errors, and retry + for { + select { + case eachErr := <-errChan: + glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) + if eachErr.generation < generation { + continue + } + break + } + } + } +} + +func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) { + // TODO assuming this is not re-configured so the partitions are fixed. + sort.Slice(assignments, func(i, j int) bool { + return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart + }) + var jobs []*EachPartitionPublishJob + hasExistingJob := len(p.jobs) == len(assignments) + for i, assignment := range assignments { + if assignment.LeaderBroker == "" { + continue + } + if hasExistingJob { + var existingJob *EachPartitionPublishJob + existingJob = p.jobs[i] + if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker { + existingJob.generation = generation + jobs = append(jobs, existingJob) + continue + } else { + if existingJob.LeaderBroker != "" { + close(existingJob.stopChan) + existingJob.LeaderBroker = "" + existingJob.wg.Wait() + } + } + } + + // start a go routine to publish to this partition + job := &EachPartitionPublishJob{ + BrokerPartitionAssignment: assignment, + stopChan: make(chan bool, 1), + generation: generation, + inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024), + } + job.wg.Add(1) + go func(job *EachPartitionPublishJob) { + defer job.wg.Done() + if err := p.doPublishToPartition(job); err != nil { + errChan <- EachPartitionError{assignment, err, generation} + } + }(job) + jobs = append(jobs, job) + // TODO assuming this is not re-configured so the partitions are fixed. + // better just re-use the existing job + p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue) + } + p.jobs = jobs +} + +func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { + + log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) + + grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + stream, err := brokerClient.PublishMessage(context.Background()) + if err != nil { + return fmt.Errorf("create publish client: %v", err) + } + publishClient := &PublishClient{ + SeaweedMessaging_PublishMessageClient: stream, + Broker: job.LeaderBroker, + } + if err = publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ + Topic: p.config.Topic.ToPbTopic(), + Partition: job.Partition, + AckInterval: 128, + }, + }, + }); err != nil { + return fmt.Errorf("send init message: %v", err) + } + resp, err := stream.Recv() + if err != nil { + return fmt.Errorf("recv init response: %v", err) + } + if resp.Error != "" { + return fmt.Errorf("init response error: %v", resp.Error) + } + + go func() { + for { + ackResp, err := publishClient.Recv() + if err != nil { + e, _ := status.FromError(err) + if e.Code() == codes.Unknown && e.Message() == "EOF" { + return + } + publishClient.Err = err + fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + return + } + if ackResp.Error != "" { + publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) + fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + return + } + if ackResp.AckSequence > 0 { + log.Printf("ack %d", ackResp.AckSequence) + } + } + }() + + publishCounter := 0 + for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { + if err := publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: data, + }, + }); err != nil { + return fmt.Errorf("send publish data: %v", err) + } + publishCounter++ + } + + if err := publishClient.CloseSend(); err != nil { + return fmt.Errorf("close send: %v", err) + } + + time.Sleep(3 * time.Second) + + log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) + + return nil +} + +func (p *TopicPublisher) doConfigureTopic() (err error) { + if len(p.config.Brokers) == 0 { + return fmt.Errorf("no bootstrap brokers") + } + var lastErr error + for _, brokerAddress := range p.config.Brokers { + err = pb.WithBrokerGrpcClient(false, + brokerAddress, + p.grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ + Topic: p.config.Topic.ToPbTopic(), + PartitionCount: p.config.CreateTopicPartitionCount, + }) + return err + }) + if err == nil { + return nil + } else { + lastErr = err + } + } + + if lastErr != nil { + return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err) + } + return nil +} + +func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) { + if len(p.config.Brokers) == 0 { + return nil, fmt.Errorf("no bootstrap brokers") + } + var lastErr error + for _, brokerAddress := range p.config.Brokers { + err := pb.WithBrokerGrpcClient(false, + brokerAddress, + p.grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + lookupResp, err := client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: p.config.Topic.ToPbTopic(), + }) + glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) + + if err != nil { + return err + } + + if len(lookupResp.BrokerPartitionAssignments) == 0 { + return fmt.Errorf("no broker partition assignments") + } + + assignments = lookupResp.BrokerPartitionAssignments + + return nil + }) + if err == nil { + return assignments, nil + } else { + lastErr = err + } + } + + return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr) + +} |
