aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/scheduler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/scheduler.go')
-rw-r--r--weed/mq/client/pub_client/scheduler.go53
1 files changed, 30 insertions, 23 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index a768fa7f8..529efc693 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -3,19 +3,19 @@ package pub_client
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
- "log"
- "sort"
- "sync"
- "sync/atomic"
- "time"
)
type EachPartitionError struct {
@@ -33,27 +33,26 @@ type EachPartitionPublishJob struct {
}
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
-
if err := p.doConfigureTopic(); err != nil {
wg.Done()
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
}
- log.Printf("start scheduler thread for topic %s", p.config.Topic)
+ log.Infof("start scheduler thread for topic %s", p.config.Topic)
generation := 0
var errChan chan EachPartitionError
for {
- glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
+ log.V(3).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
if assignments, err := p.doLookupTopicPartitions(); err == nil {
generation++
- glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
+ log.V(3).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
p.onEachAssignments(generation, assignments, errChan)
} else {
- glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
+ log.Errorf("lookup topic %s: %v", p.config.Topic, err)
time.Sleep(5 * time.Second)
continue
}
@@ -66,7 +65,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
for {
select {
case eachErr := <-errChan:
- glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
+ log.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
if eachErr.generation < generation {
continue
}
@@ -114,7 +113,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
go func(job *EachPartitionPublishJob) {
defer job.wg.Done()
if err := p.doPublishToPartition(job); err != nil {
- log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
+ log.Infof("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
errChan <- EachPartitionError{assignment, err, generation}
}
}(job)
@@ -127,8 +126,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
}
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
-
- log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
+ log.Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption)
if err != nil {
@@ -159,10 +157,19 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
// process the hello message
resp, err := stream.Recv()
if err != nil {
- return fmt.Errorf("recv init response: %v", err)
+ e, _ := status.FromError(err)
+ if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Infof("publish to %s EOF", publishClient.Broker)
+ return nil
+ }
+ publishClient.Err = err
+ log.Errorf("publish1 to %s error: %v", publishClient.Broker, err)
+ return err
}
if resp.Error != "" {
- return fmt.Errorf("init response error: %v", resp.Error)
+ publishClient.Err = fmt.Errorf("ack error: %v", resp.Error)
+ log.Errorf("publish2 to %s error: %v", publishClient.Broker, resp.Error)
+ return fmt.Errorf("ack error: %v", resp.Error)
}
var publishedTsNs int64
@@ -176,20 +183,20 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
- log.Printf("publish to %s EOF", publishClient.Broker)
+ log.Infof("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Errorf("publish1 to %s error: %v", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Errorf("publish2 to %s error: %v", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ log.Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
return
@@ -222,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}
}
- log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
+ log.Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil
}
@@ -272,7 +279,7 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP
&mq_pb.LookupTopicBrokersRequest{
Topic: p.config.Topic.ToPbTopic(),
})
- glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
+ log.V(3).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
if err != nil {
return err