diff options
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 53 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 20 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 14 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/subscribe.go | 10 |
4 files changed, 52 insertions, 45 deletions
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index a768fa7f8..529efc693 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -3,19 +3,19 @@ package pub_client import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "sort" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "github.com/seaweedfs/seaweedfs/weed/util/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "log" - "sort" - "sync" - "sync/atomic" - "time" ) type EachPartitionError struct { @@ -33,27 +33,26 @@ type EachPartitionPublishJob struct { } func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { - if err := p.doConfigureTopic(); err != nil { wg.Done() return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } - log.Printf("start scheduler thread for topic %s", p.config.Topic) + log.Infof("start scheduler thread for topic %s", p.config.Topic) generation := 0 var errChan chan EachPartitionError for { - glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) + log.V(3).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) if assignments, err := p.doLookupTopicPartitions(); err == nil { generation++ - glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) + log.V(3).Infof("start generation %d with %d assignments", generation, len(assignments)) if errChan == nil { errChan = make(chan EachPartitionError, len(assignments)) } p.onEachAssignments(generation, assignments, errChan) } else { - glog.Errorf("lookup topic %s: %v", p.config.Topic, err) + log.Errorf("lookup topic %s: %v", p.config.Topic, err) time.Sleep(5 * time.Second) continue } @@ -66,7 +65,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { for { select { case eachErr := <-errChan: - glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) + log.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) if eachErr.generation < generation { continue } @@ -114,7 +113,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. go func(job *EachPartitionPublishJob) { defer job.wg.Done() if err := p.doPublishToPartition(job); err != nil { - log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) + log.Infof("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) errChan <- EachPartitionError{assignment, err, generation} } }(job) @@ -127,8 +126,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. } func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { - - log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) + log.Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption) if err != nil { @@ -159,10 +157,19 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro // process the hello message resp, err := stream.Recv() if err != nil { - return fmt.Errorf("recv init response: %v", err) + e, _ := status.FromError(err) + if e.Code() == codes.Unknown && e.Message() == "EOF" { + log.Infof("publish to %s EOF", publishClient.Broker) + return nil + } + publishClient.Err = err + log.Errorf("publish1 to %s error: %v", publishClient.Broker, err) + return err } if resp.Error != "" { - return fmt.Errorf("init response error: %v", resp.Error) + publishClient.Err = fmt.Errorf("ack error: %v", resp.Error) + log.Errorf("publish2 to %s error: %v", publishClient.Broker, resp.Error) + return fmt.Errorf("ack error: %v", resp.Error) } var publishedTsNs int64 @@ -176,20 +183,20 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { - log.Printf("publish to %s EOF", publishClient.Broker) + log.Infof("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err - log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + log.Errorf("publish1 to %s error: %v", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Errorf("publish2 to %s error: %v", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { - log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + log.Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { return @@ -222,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro } } - log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) + log.Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil } @@ -272,7 +279,7 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP &mq_pb.LookupTopicBrokersRequest{ Topic: p.config.Topic.ToPbTopic(), }) - glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) + log.V(3).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) if err != nil { return err diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index e88aaca2f..feccca7a4 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -1,7 +1,7 @@ package sub_client import ( - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "time" @@ -29,17 +29,17 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { return nil }) if err != nil { - glog.V(0).Infof("broker coordinator on %s: %v", broker, err) + log.V(3).Infof("broker coordinator on %s: %v", broker, err) continue } - glog.V(0).Infof("found broker coordinator: %v", brokerLeader) + log.V(3).Infof("found broker coordinator: %v", brokerLeader) // connect to the balancer pb.WithBrokerGrpcClient(true, brokerLeader, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { stream, err := client.SubscriberToSubCoordinator(sub.ctx) if err != nil { - glog.V(0).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err) return err } waitTime = 1 * time.Second @@ -56,7 +56,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { }, }, }); err != nil { - glog.V(0).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err) return err } @@ -69,9 +69,9 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { default: } - glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply) + log.V(3).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply) if err := stream.Send(reply); err != nil { - glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) return } } @@ -81,7 +81,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { for { resp, err := stream.Recv() if err != nil { - glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err) return err } @@ -92,13 +92,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { } sub.brokerPartitionAssignmentChan <- resp - glog.V(0).Infof("Received assignment: %+v", resp) + log.V(3).Infof("Received assignment: %+v", resp) } return nil }) } - glog.V(0).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + log.V(3).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) if waitTime < 10*time.Second { waitTime += 1 * time.Second } diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 14a38cfa8..a931eb71f 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -52,10 +52,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }, }, }); err != nil { - glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) + log.V(3).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) } - glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) + log.V(3).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) if sub.OnCompletionFunc != nil { defer sub.OnCompletionFunc() @@ -88,7 +88,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }() for { - // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + // log.V(3).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { if errors.Is(err, io.EOF) { @@ -97,7 +97,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { - glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + log.V(3).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } @@ -112,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig switch m := resp.Message.(type) { case *mq_pb.SubscribeMessageResponse_Data: if m.Data.Ctrl != nil { - glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose) + log.V(1).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose) continue } if len(m.Data.Key) == 0 { @@ -121,7 +121,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig } onDataMessageFn(m) case *mq_pb.SubscribeMessageResponse_Ctrl: - // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) + // log.V(3).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { return io.EOF } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index d4dea3852..1d06e0601 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,7 +1,7 @@ package sub_client import ( - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -60,7 +60,7 @@ func (sub *TopicSubscriber) startProcessors() { <-semaphore wg.Done() }() - glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{ Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{ AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{ @@ -84,9 +84,9 @@ func (sub *TopicSubscriber) startProcessors() { err := sub.onEachPartition(assigned, stopChan, onDataMessageFn) if err != nil { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) + log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) } else { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) } sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{ Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{ @@ -130,7 +130,7 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti } sub.activeProcessorsLock.Unlock() if foundOverlapping { - glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition) + log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition) time.Sleep(1 * time.Second) } } |
