aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client')
-rw-r--r--weed/mq/client/pub_client/scheduler.go53
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go20
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go14
-rw-r--r--weed/mq/client/sub_client/subscribe.go10
4 files changed, 52 insertions, 45 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index a768fa7f8..529efc693 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -3,19 +3,19 @@ package pub_client
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
- "log"
- "sort"
- "sync"
- "sync/atomic"
- "time"
)
type EachPartitionError struct {
@@ -33,27 +33,26 @@ type EachPartitionPublishJob struct {
}
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
-
if err := p.doConfigureTopic(); err != nil {
wg.Done()
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
}
- log.Printf("start scheduler thread for topic %s", p.config.Topic)
+ log.Infof("start scheduler thread for topic %s", p.config.Topic)
generation := 0
var errChan chan EachPartitionError
for {
- glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
+ log.V(3).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
if assignments, err := p.doLookupTopicPartitions(); err == nil {
generation++
- glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
+ log.V(3).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
p.onEachAssignments(generation, assignments, errChan)
} else {
- glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
+ log.Errorf("lookup topic %s: %v", p.config.Topic, err)
time.Sleep(5 * time.Second)
continue
}
@@ -66,7 +65,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
for {
select {
case eachErr := <-errChan:
- glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
+ log.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
if eachErr.generation < generation {
continue
}
@@ -114,7 +113,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
go func(job *EachPartitionPublishJob) {
defer job.wg.Done()
if err := p.doPublishToPartition(job); err != nil {
- log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
+ log.Infof("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
errChan <- EachPartitionError{assignment, err, generation}
}
}(job)
@@ -127,8 +126,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
}
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
-
- log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
+ log.Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption)
if err != nil {
@@ -159,10 +157,19 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
// process the hello message
resp, err := stream.Recv()
if err != nil {
- return fmt.Errorf("recv init response: %v", err)
+ e, _ := status.FromError(err)
+ if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Infof("publish to %s EOF", publishClient.Broker)
+ return nil
+ }
+ publishClient.Err = err
+ log.Errorf("publish1 to %s error: %v", publishClient.Broker, err)
+ return err
}
if resp.Error != "" {
- return fmt.Errorf("init response error: %v", resp.Error)
+ publishClient.Err = fmt.Errorf("ack error: %v", resp.Error)
+ log.Errorf("publish2 to %s error: %v", publishClient.Broker, resp.Error)
+ return fmt.Errorf("ack error: %v", resp.Error)
}
var publishedTsNs int64
@@ -176,20 +183,20 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
- log.Printf("publish to %s EOF", publishClient.Broker)
+ log.Infof("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Errorf("publish1 to %s error: %v", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Errorf("publish2 to %s error: %v", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ log.Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
return
@@ -222,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}
}
- log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
+ log.Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil
}
@@ -272,7 +279,7 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP
&mq_pb.LookupTopicBrokersRequest{
Topic: p.config.Topic.ToPbTopic(),
})
- glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
+ log.V(3).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
if err != nil {
return err
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index e88aaca2f..feccca7a4 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -1,7 +1,7 @@
package sub_client
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
@@ -29,17 +29,17 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
})
if err != nil {
- glog.V(0).Infof("broker coordinator on %s: %v", broker, err)
+ log.V(3).Infof("broker coordinator on %s: %v", broker, err)
continue
}
- glog.V(0).Infof("found broker coordinator: %v", brokerLeader)
+ log.V(3).Infof("found broker coordinator: %v", brokerLeader)
// connect to the balancer
pb.WithBrokerGrpcClient(true, brokerLeader, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.SubscriberToSubCoordinator(sub.ctx)
if err != nil {
- glog.V(0).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err)
return err
}
waitTime = 1 * time.Second
@@ -56,7 +56,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
},
},
}); err != nil {
- glog.V(0).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err)
return err
}
@@ -69,9 +69,9 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
default:
}
- glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
+ log.V(3).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
if err := stream.Send(reply); err != nil {
- glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
return
}
}
@@ -81,7 +81,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
for {
resp, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
return err
}
@@ -92,13 +92,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
}
sub.brokerPartitionAssignmentChan <- resp
- glog.V(0).Infof("Received assignment: %+v", resp)
+ log.V(3).Infof("Received assignment: %+v", resp)
}
return nil
})
}
- glog.V(0).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ log.V(3).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
if waitTime < 10*time.Second {
waitTime += 1 * time.Second
}
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 14a38cfa8..a931eb71f 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -52,10 +52,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
},
},
}); err != nil {
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
+ log.V(3).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
}
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
@@ -88,7 +88,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}()
for {
- // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ // log.V(3).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
@@ -97,7 +97,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
- glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ log.V(3).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
@@ -112,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
if m.Data.Ctrl != nil {
- glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
+ log.V(1).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
continue
}
if len(m.Data.Key) == 0 {
@@ -121,7 +121,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
onDataMessageFn(m)
case *mq_pb.SubscribeMessageResponse_Ctrl:
- // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
+ // log.V(3).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index d4dea3852..1d06e0601 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,7 +1,7 @@
package sub_client
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -60,7 +60,7 @@ func (sub *TopicSubscriber) startProcessors() {
<-semaphore
wg.Done()
}()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
@@ -84,9 +84,9 @@ func (sub *TopicSubscriber) startProcessors() {
err := sub.onEachPartition(assigned, stopChan, onDataMessageFn)
if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
+ log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
}
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
@@ -130,7 +130,7 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti
}
sub.activeProcessorsLock.Unlock()
if foundOverlapping {
- glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
+ log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
time.Sleep(1 * time.Second)
}
}