diff options
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index df2270b2c..a768fa7f8 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -7,7 +7,9 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "log" "sort" @@ -33,6 +35,7 @@ type EachPartitionPublishJob struct { func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { if err := p.doConfigureTopic(); err != nil { + wg.Done() return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } @@ -111,6 +114,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. go func(job *EachPartitionPublishJob) { defer job.wg.Done() if err := p.doPublishToPartition(job); err != nil { + log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) errChan <- EachPartitionError{assignment, err, generation} } }(job) @@ -126,7 +130,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) - grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption) + grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption) if err != nil { return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err) } @@ -225,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro func (p *TopicPublisher) doConfigureTopic() (err error) { if len(p.config.Brokers) == 0 { - return fmt.Errorf("no bootstrap brokers") + return fmt.Errorf("topic configuring found no bootstrap brokers") } var lastErr error for _, brokerAddress := range p.config.Brokers { @@ -256,7 +260,7 @@ func (p *TopicPublisher) doConfigureTopic() (err error) { func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) { if len(p.config.Brokers) == 0 { - return nil, fmt.Errorf("no bootstrap brokers") + return nil, fmt.Errorf("lookup found no bootstrap brokers") } var lastErr error for _, brokerAddress := range p.config.Brokers { |
