diff options
| author | chrislu <chris.lu@gmail.com> | 2025-05-22 09:54:31 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-05-22 09:54:31 -0700 |
| commit | 0d62be44846354c3c37b857028297edd4b8df17b (patch) | |
| tree | c89320a7d58351030f1b740c7267f56bf0206429 /weed/mq | |
| parent | d8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff) | |
| download | seaweedfs-origin/changing-to-zap.tar.xz seaweedfs-origin/changing-to-zap.zip | |
Diffstat (limited to 'weed/mq')
26 files changed, 174 insertions, 167 deletions
diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go index 87baa466c..ac7cc1c6f 100644 --- a/weed/mq/agent/agent_grpc_subscribe.go +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -2,7 +2,7 @@ package agent import ( "context" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" @@ -31,7 +31,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA record := &schema_pb.RecordValue{} err := proto.Unmarshal(m.Data.Value, record) if err != nil { - glog.V(0).Infof("unmarshal record value: %v", err) + log.V(3).Infof("unmarshal record value: %v", err) if lastErr == nil { lastErr = err } @@ -42,7 +42,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA Value: record, TsNs: m.Data.TsNs, }); sendErr != nil { - glog.V(0).Infof("send record: %v", sendErr) + log.V(3).Infof("send record: %v", sendErr) if lastErr == nil { lastErr = sendErr } @@ -53,7 +53,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA go func() { subErr := subscriber.Subscribe() if subErr != nil { - glog.V(0).Infof("subscriber %s subscribe: %v", subscriber.SubscriberConfig.String(), subErr) + log.V(3).Infof("subscriber %s subscribe: %v", subscriber.SubscriberConfig.String(), subErr) if lastErr == nil { lastErr = subErr } @@ -63,7 +63,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA for { m, err := stream.Recv() if err != nil { - glog.V(0).Infof("subscriber %s receive: %v", subscriber.SubscriberConfig.String(), err) + log.V(3).Infof("subscriber %s receive: %v", subscriber.SubscriberConfig.String(), err) return err } if m != nil { diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go index 386d86570..8018cfe57 100644 --- a/weed/mq/broker/broker_connect.go +++ b/weed/mq/broker/broker_connect.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" @@ -16,7 +16,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop self := string(b.option.BrokerAddress()) - glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer) + log.V(3).Infof("broker %s connects to balancer %s", self, brokerBalancer) if brokerBalancer == "" { return fmt.Errorf("no balancer found") } @@ -59,7 +59,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop } return fmt.Errorf("send stats message to balancer %s: %v", brokerBalancer, err) } - // glog.V(3).Infof("sent stats: %+v", stats) + // log.V(0).Infof("sent stats: %+v", stats) time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) } @@ -82,7 +82,7 @@ func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh c for { err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan) if err != nil { - glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err) + log.V(3).Infof("connect to balancer %s: %v", newBrokerBalancer, err) time.Sleep(time.Second) } else { break diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 991208a72..989cccaef 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -50,7 +50,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } } - glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) + log.V(3).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } @@ -91,7 +91,7 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) return nil }); doCreateErr != nil { - glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) + log.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) } }(bpa) } diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index f827f0b37..ea781807f 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -36,7 +36,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. var readErr, assignErr error resp, readErr = b.fca.ReadTopicConfFromFiler(t) if readErr != nil { - glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr) + log.V(3).Infof("read topic %s conf: %v", request.Topic, readErr) } if resp != nil { @@ -47,13 +47,13 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { - glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) + log.V(3).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) return } if resp != nil && len(resp.BrokerPartitionAssignments) > 0 { if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil { - glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) + log.V(2).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) } } resp = &mq_pb.ConfigureTopicResponse{} @@ -70,7 +70,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) - glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) + log.V(3).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) return resp, err } diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 65a1ffda8..07cecccd6 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -28,7 +28,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq conf := &mq_pb.ConfigureTopicResponse{} ret.Topic = request.Topic if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil { - glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err) + log.V(3).Infof("lookup topic %s conf: %v", request.Topic, err) } else { err = b.ensureTopicActiveAssignments(t, conf) ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f31dc7eff..64841a1b9 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" @@ -46,7 +46,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis initMessage := req.GetInit() if initMessage == nil { response.Error = fmt.Sprintf("missing init message") - glog.Errorf("missing init message") + log.Errorf("missing init message") return stream.Send(response) } @@ -55,14 +55,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) if getOrGenErr != nil { response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) - glog.Errorf("topic %v not found: %v", t, getOrGenErr) + log.Errorf("topic %v not found: %v", t, getOrGenErr) return stream.Send(response) } // connect to follower brokers if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { response.Error = followerErr.Error() - glog.Errorf("MaybeConnectToFollowers: %v", followerErr) + log.Errorf("MaybeConnectToFollowers: %v", followerErr) return stream.Send(response) } @@ -88,7 +88,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis AckSequence: acknowledgedSequence, } if err := stream.Send(response); err != nil { - glog.Errorf("Error sending response %v: %v", response, err) + log.Errorf("Error sending response %v: %v", response, err) } // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) lastAckTime = time.Now() @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition.Publishers.RemovePublisher(clientName) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveLocalPartition(t, p) - glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) + log.V(3).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } }() @@ -126,7 +126,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + log.V(3).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) break } @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } } - glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) + log.V(3).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } @@ -155,11 +155,11 @@ func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) pr, ok := peer.FromContext(ctx) if !ok { - glog.Error("failed to get peer from ctx") + log.Error("failed to get peer from ctx") return "" } if pr.Addr == net.Addr(nil) { - glog.Error("failed to get peer address") + log.Error("failed to get peer address") return "" } return pr.Addr.String() diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go index 5978d2173..cc48ce604 100644 --- a/weed/mq/broker/broker_grpc_pub_balancer.go +++ b/weed/mq/broker/broker_grpc_pub_balancer.go @@ -41,7 +41,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin } if receivedStats := req.GetStats(); receivedStats != nil { b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats) - // glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats) + // log.V(-1).Infof("received from %v: %+v", initMessage.Broker, receivedStats) } } diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 291f1ef62..367cd12ef 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -2,7 +2,7 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" @@ -43,7 +43,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi err = nil break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + log.V(3).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) break } @@ -58,14 +58,14 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi if err := stream.Send(&mq_pb.PublishFollowMeResponse{ AckTsNs: dataMessage.TsNs, }); err != nil { - glog.Errorf("Error sending response %v: %v", dataMessage, err) + log.Errorf("Error sending response %v: %v", dataMessage, err) } // println("ack", string(dataMessage.Key), dataMessage.TsNs) } else if closeMessage := req.GetClose(); closeMessage != nil { - glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + log.V(3).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) break } else if flushMessage := req.GetFlush(); flushMessage != nil { - glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) + log.V(3).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) lastFlushTsNs = flushMessage.TsNs @@ -80,7 +80,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi } } else { - glog.Errorf("unknown message: %v", req) + log.Errorf("unknown message: %v", req) } } @@ -104,7 +104,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC() if stopTime.UnixNano() <= lastFlushTsNs { - glog.V(0).Infof("dropping remaining data at %v %v", t, p) + log.V(3).Infof("dropping remaining data at %v %v", t, p) continue } @@ -114,17 +114,17 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi for { if err := b.appendToFile(targetFile, mem.buf); err != nil { - glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + log.V(3).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { break } } - glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf)) + log.V(3).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf)) } - glog.V(0).Infof("shut down follower for %v %v", t, p) + log.V(3).Infof("shut down follower for %v %v", t, p) return err } @@ -140,7 +140,7 @@ func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_qu startTime: startTime, stopTime: stopTime, }) - glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf)) + log.V(3).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf)) }, nil, func() { }) return lb diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 9cdbe8325..ba8efb5e4 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -23,7 +23,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs return err } if req.GetInit() == nil { - glog.Errorf("missing init message") + log.Errorf("missing init message") return fmt.Errorf("missing init message") } @@ -33,7 +33,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) - glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + log.V(3).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) if getOrGenErr != nil { @@ -41,7 +41,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) - glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) + log.V(3).Infof("Subscriber %s connected on %v %v", clientName, t, partition) isConnected := true sleepIntervalCount := 0 @@ -49,7 +49,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs defer func() { isConnected = false localTopicPartition.Subscribers.RemoveSubscriber(clientName) - glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) + log.V(3).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveLocalPartition(t, partition) } @@ -60,7 +60,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // connect to the follower var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient - glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker) + log.V(3).Infof("follower broker: %v", req.GetInit().FollowerBroker) if req.GetInit().FollowerBroker != "" { follower := req.GetInit().FollowerBroker if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil { @@ -90,7 +90,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } } - glog.V(0).Infof("follower %s connected", follower) + log.V(3).Infof("follower %s connected", follower) } go func() { @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }}) break } - glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) + log.V(3).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) break } if ack.GetAck().Key == nil { @@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }, }, }); err != nil { - glog.Errorf("Error sending ack to follower: %v", err) + log.Errorf("Error sending ack to follower: %v", err) break } lastOffset = currentLastOffset @@ -133,9 +133,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } if lastOffset > 0 { - glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) + log.V(3).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil { - glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) + log.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) } } if subscribeFollowMeStream != nil { @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }, }); err != nil { if err != io.EOF { - glog.Errorf("Error sending close to follower: %v", err) + log.Errorf("Error sending close to follower: %v", err) } } } @@ -169,7 +169,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Client disconnected return false } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err) return false default: // Continue processing the request @@ -190,7 +190,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Client disconnected return false, nil } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err) return false, nil default: // Continue processing the request @@ -207,7 +207,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs TsNs: logEntry.TsNs, }, }}); err != nil { - glog.Errorf("Error sending data: %v", err) + log.Errorf("Error sending data: %v", err) return false, err } @@ -241,7 +241,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess // try to resume if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { - glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) + log.V(3).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) startPosition = log_buffer.NewMessagePosition(storedOffset, -2) return } diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 985b0a47e..6dffa606d 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -3,7 +3,7 @@ package broker import ( "context" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" @@ -29,13 +29,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess if err != nil { return status.Errorf(codes.InvalidArgument, "failed to add subscriber: %v", err) } - glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) + log.V(3).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) } else { return status.Errorf(codes.InvalidArgument, "subscriber init message is empty") } defer func() { b.SubCoordinator.RemoveSubscriber(initMessage) - glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) }() ctx := stream.Context() @@ -45,15 +45,15 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess for { req, err := stream.Recv() if err != nil { - glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) } if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil { - glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment) + log.V(3).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment) cg.AckUnAssignment(cgi, ackUnAssignment) } if ackAssignment := req.GetAckAssignment(); ackAssignment != nil { - glog.V(0).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment) + log.V(3).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment) cg.AckAssignment(cgi, ackAssignment) } @@ -80,12 +80,12 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess // Client disconnected return err } - glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) return err case message := <-cgi.ResponseChan: - glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message) + log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message) if err := stream.Send(message); err != nil { - glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) } } } diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index bed906c30..39344d4a6 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -3,7 +3,7 @@ package broker import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -34,7 +34,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub err = nil break } - glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err) + log.V(3).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err) break } @@ -43,10 +43,10 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub lastOffset = ackMessage.TsNs // println("sub follower got offset", lastOffset) } else if closeMessage := req.GetClose(); closeMessage != nil { - glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + log.V(3).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) return nil } else { - glog.Errorf("unknown message: %v", req) + log.Errorf("unknown message: %v", req) } } @@ -56,7 +56,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset) } - glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset) + log.V(3).Infof("shut down follower for %v offset %d", initMessage, lastOffset) return err } @@ -90,7 +90,7 @@ func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Part util.Uint64toBytes(offsetBytes, uint64(offset)) return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset) + log.V(3).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset) return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes) }) } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index d80fa91a4..ef1d6ec75 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -3,7 +3,7 @@ package broker import ( "context" "github.com/seaweedfs/seaweedfs/weed/filer_client" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -89,12 +89,12 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial time.Sleep(time.Millisecond * 237) } self := option.BrokerAddress() - glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler) + log.V(3).Infof("broker %s found filer %s", self, mqBroker.currentFiler) newBrokerBalancerCh := make(chan string, 1) lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { - glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner) + log.V(3).Infof("broker %s found balanacer %s", self, newLockOwner) newBrokerBalancerCh <- newLockOwner }) mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 222ff16ba..cc1cbce19 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -2,7 +2,7 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -13,12 +13,12 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio // get or generate a local partition conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) if readConfErr != nil { - glog.Errorf("topic %v not found: %v", t, readConfErr) + log.Errorf("topic %v not found: %v", t, readConfErr) return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) } localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) if getOrGenError != nil { - glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + log.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) } return localTopicPartition, nil @@ -55,7 +55,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m // also fix assignee broker if invalid hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments) if hasChanges { - glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) + log.V(3).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil { return err } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index d6513b2a2..94ae45b73 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -2,7 +2,7 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "sync/atomic" @@ -25,7 +25,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l for { if err := b.appendToFile(targetFile, buf); err != nil { - glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + log.V(3).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { break @@ -40,6 +40,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) } - glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) + log.V(3).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) } } diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index a768fa7f8..529efc693 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -3,19 +3,19 @@ package pub_client import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "sort" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" + "github.com/seaweedfs/seaweedfs/weed/util/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "log" - "sort" - "sync" - "sync/atomic" - "time" ) type EachPartitionError struct { @@ -33,27 +33,26 @@ type EachPartitionPublishJob struct { } func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { - if err := p.doConfigureTopic(); err != nil { wg.Done() return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } - log.Printf("start scheduler thread for topic %s", p.config.Topic) + log.Infof("start scheduler thread for topic %s", p.config.Topic) generation := 0 var errChan chan EachPartitionError for { - glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) + log.V(3).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) if assignments, err := p.doLookupTopicPartitions(); err == nil { generation++ - glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) + log.V(3).Infof("start generation %d with %d assignments", generation, len(assignments)) if errChan == nil { errChan = make(chan EachPartitionError, len(assignments)) } p.onEachAssignments(generation, assignments, errChan) } else { - glog.Errorf("lookup topic %s: %v", p.config.Topic, err) + log.Errorf("lookup topic %s: %v", p.config.Topic, err) time.Sleep(5 * time.Second) continue } @@ -66,7 +65,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { for { select { case eachErr := <-errChan: - glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) + log.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) if eachErr.generation < generation { continue } @@ -114,7 +113,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. go func(job *EachPartitionPublishJob) { defer job.wg.Done() if err := p.doPublishToPartition(job); err != nil { - log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) + log.Infof("publish to %s partition %v: %v", p.config.Topic, job.Partition, err) errChan <- EachPartitionError{assignment, err, generation} } }(job) @@ -127,8 +126,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb. } func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { - - log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) + log.Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption) if err != nil { @@ -159,10 +157,19 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro // process the hello message resp, err := stream.Recv() if err != nil { - return fmt.Errorf("recv init response: %v", err) + e, _ := status.FromError(err) + if e.Code() == codes.Unknown && e.Message() == "EOF" { + log.Infof("publish to %s EOF", publishClient.Broker) + return nil + } + publishClient.Err = err + log.Errorf("publish1 to %s error: %v", publishClient.Broker, err) + return err } if resp.Error != "" { - return fmt.Errorf("init response error: %v", resp.Error) + publishClient.Err = fmt.Errorf("ack error: %v", resp.Error) + log.Errorf("publish2 to %s error: %v", publishClient.Broker, resp.Error) + return fmt.Errorf("ack error: %v", resp.Error) } var publishedTsNs int64 @@ -176,20 +183,20 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { - log.Printf("publish to %s EOF", publishClient.Broker) + log.Infof("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err - log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) + log.Errorf("publish1 to %s error: %v", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) - log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) + log.Errorf("publish2 to %s error: %v", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { - log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) + log.Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { return @@ -222,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro } } - log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) + log.Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil } @@ -272,7 +279,7 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP &mq_pb.LookupTopicBrokersRequest{ Topic: p.config.Topic.ToPbTopic(), }) - glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) + log.V(3).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) if err != nil { return err diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index e88aaca2f..feccca7a4 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -1,7 +1,7 @@ package sub_client import ( - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "time" @@ -29,17 +29,17 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { return nil }) if err != nil { - glog.V(0).Infof("broker coordinator on %s: %v", broker, err) + log.V(3).Infof("broker coordinator on %s: %v", broker, err) continue } - glog.V(0).Infof("found broker coordinator: %v", brokerLeader) + log.V(3).Infof("found broker coordinator: %v", brokerLeader) // connect to the balancer pb.WithBrokerGrpcClient(true, brokerLeader, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { stream, err := client.SubscriberToSubCoordinator(sub.ctx) if err != nil { - glog.V(0).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err) return err } waitTime = 1 * time.Second @@ -56,7 +56,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { }, }, }); err != nil { - glog.V(0).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err) return err } @@ -69,9 +69,9 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { default: } - glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply) + log.V(3).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply) if err := stream.Send(reply); err != nil { - glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err) return } } @@ -81,7 +81,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { for { resp, err := stream.Recv() if err != nil { - glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err) + log.V(3).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err) return err } @@ -92,13 +92,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() { } sub.brokerPartitionAssignmentChan <- resp - glog.V(0).Infof("Received assignment: %+v", resp) + log.V(3).Infof("Received assignment: %+v", resp) } return nil }) } - glog.V(0).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + log.V(3).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) if waitTime < 10*time.Second { waitTime += 1 * time.Second } diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 14a38cfa8..a931eb71f 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -52,10 +52,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }, }, }); err != nil { - glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) + log.V(3).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err) } - glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) + log.V(3).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker) if sub.OnCompletionFunc != nil { defer sub.OnCompletionFunc() @@ -88,7 +88,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig }() for { - // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + // log.V(3).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) resp, err := subscribeClient.Recv() if err != nil { if errors.Is(err, io.EOF) { @@ -97,7 +97,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { - glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) + log.V(3).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } @@ -112,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig switch m := resp.Message.(type) { case *mq_pb.SubscribeMessageResponse_Data: if m.Data.Ctrl != nil { - glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose) + log.V(1).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose) continue } if len(m.Data.Key) == 0 { @@ -121,7 +121,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig } onDataMessageFn(m) case *mq_pb.SubscribeMessageResponse_Ctrl: - // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) + // log.V(3).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl) if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic { return io.EOF } diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go index d4dea3852..1d06e0601 100644 --- a/weed/mq/client/sub_client/subscribe.go +++ b/weed/mq/client/sub_client/subscribe.go @@ -1,7 +1,7 @@ package sub_client import ( - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -60,7 +60,7 @@ func (sub *TopicSubscriber) startProcessors() { <-semaphore wg.Done() }() - glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{ Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{ AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{ @@ -84,9 +84,9 @@ func (sub *TopicSubscriber) startProcessors() { err := sub.onEachPartition(assigned, stopChan, onDataMessageFn) if err != nil { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) + log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err) } else { - glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) + log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker) } sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{ Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{ @@ -130,7 +130,7 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti } sub.activeProcessorsLock.Unlock() if foundOverlapping { - glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition) + log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition) time.Sleep(1 * time.Second) } } diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index 30cad8cc1..c16990c5e 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -160,7 +160,7 @@ func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, time } logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name) if err != nil { - // glog.Warningf("parse log time %s: %v", entry.Name, err) + // log.Warningf("parse log time %s: %v", entry.Name, err) return nil } if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs { diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go index 03a47ace4..ea8868a70 100644 --- a/weed/mq/logstore/merged_read.go +++ b/weed/mq/logstore/merged_read.go @@ -17,9 +17,9 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun var lastProcessedPosition log_buffer.MessagePosition return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) { if !exhaustedParquet { - // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) + // log.V(-1).Infof("reading from parquet startPosition: %v\n", startPosition.UTC()) lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn) - // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) + // log.V(-1).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err) if isDone { isDone = false } @@ -34,7 +34,7 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun startPosition = lastProcessedPosition } - // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC()) + // log.V(-1).Infof("reading from direct log startPosition: %v\n", startPosition.UTC()) lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn) return } diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go index 71ba58c1f..00e412b40 100644 --- a/weed/mq/logstore/read_log_from_disk.go +++ b/weed/mq/logstore/read_log_from_disk.go @@ -3,7 +3,7 @@ package logstore import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" @@ -71,7 +71,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top continue } if chunk.IsChunkManifest { - glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) + log.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name) return } urlStrings, err = lookupFileIdFn(chunk.FileId) diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go index 46d423b30..94398d6c7 100644 --- a/weed/mq/pub_balancer/allocate.go +++ b/weed/mq/pub_balancer/allocate.go @@ -2,7 +2,7 @@ package pub_balancer import ( cmap "github.com/orcaman/concurrent-map/v2" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "math/rand" @@ -30,7 +30,7 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p EnsureAssignmentsToActiveBrokers(brokers, 1, assignments) - glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments) + log.V(3).Infof("allocate topic partitions %d: %v", len(assignments), assignments) return } @@ -78,7 +78,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string, // EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) { - glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments) + log.V(3).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments) candidates := make([]string, 0, activeBrokers.Count()) for brokerStatsItem := range activeBrokers.IterBuffered() { @@ -122,6 +122,6 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, * } - glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges) + log.V(3).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges) return } diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go index 34bdfd286..168f7f83d 100644 --- a/weed/mq/pub_balancer/partition_list_broker.go +++ b/weed/mq/pub_balancer/partition_list_broker.go @@ -1,7 +1,7 @@ package pub_balancer import ( - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -28,11 +28,11 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *schema_pb.Partition, b for _, partitionSlot := range ps.PartitionSlots { if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop { if partitionSlot.AssignedBroker != "" && partitionSlot.AssignedBroker != broker { - glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker) + log.V(3).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker) partitionSlot.AssignedBroker = broker } if partitionSlot.FollowerBroker != "" && partitionSlot.FollowerBroker != follower { - glog.V(0).Infof("partition %s follower change: %s => %s", partition, partitionSlot.FollowerBroker, follower) + log.V(3).Infof("partition %s follower change: %s => %s", partition, partitionSlot.FollowerBroker, follower) partitionSlot.FollowerBroker = follower } diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index ba94f34b4..afe400989 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -4,7 +4,7 @@ import ( "fmt" cmap "github.com/orcaman/concurrent-map/v2" "github.com/seaweedfs/seaweedfs/weed/filer_client" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" @@ -35,7 +35,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce } cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second) } else { - glog.V(0).Infof("fail to read topic conf from filer: %v", err) + log.V(3).Infof("fail to read topic conf from filer: %v", err) return nil } @@ -45,7 +45,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce case adjustment := <-cg.Market.AdjustmentChan: cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer)) if !found { - glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer) + log.V(3).Infof("consumer group instance %s not found", adjustment.consumer) continue } if adjustment.isAssign { @@ -63,7 +63,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce }, }, } - glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer) + log.V(3).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer) break } } @@ -76,7 +76,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce }, }, } - glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer) + log.V(3).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer) } case <-cg.stopCh: return diff --git a/weed/mq/sub_coordinator/market.go b/weed/mq/sub_coordinator/market.go index df07edfd5..7473248f7 100644 --- a/weed/mq/sub_coordinator/market.go +++ b/weed/mq/sub_coordinator/market.go @@ -2,7 +2,7 @@ package sub_coordinator import ( "errors" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "sync" "time" @@ -290,7 +290,7 @@ func (m *Market) ConfirmAdjustment(adjustment *Adjustment) { } else { m.unassignPartitionSlot(adjustment.partition) } - glog.V(1).Infof("ConfirmAdjustment %+v", adjustment) + log.V(2).Infof("ConfirmAdjustment %+v", adjustment) m.Status() } @@ -300,12 +300,12 @@ func (m *Market) unassignPartitionSlot(partition topic.Partition) { partitionSlot, exists := m.partitions[partition] if !exists { - glog.V(0).Infof("partition %+v slot is not tracked", partition) + log.V(3).Infof("partition %+v slot is not tracked", partition) return } if partitionSlot.AssignedTo == nil { - glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition) + log.V(3).Infof("partition %+v slot is not assigned to any consumer", partition) return } @@ -319,7 +319,7 @@ func (m *Market) unassignPartitionSlot(partition topic.Partition) { } } - glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition) + log.V(3).Infof("partition %+v slot not found in assigned consumer", partition) } @@ -329,18 +329,18 @@ func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInsta partitionSlot, exists := m.partitions[partition] if !exists { - glog.V(0).Infof("partition %+v slot is not tracked", partition) + log.V(3).Infof("partition %+v slot is not tracked", partition) return } if partitionSlot.AssignedTo != nil { - glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId) + log.V(3).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId) return } consumerInstance, exists := m.consumerInstances[consumerInstanceId] if !exists { - glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId) + log.V(3).Infof("consumer %+v is not tracked", consumerInstanceId) return } @@ -353,15 +353,15 @@ func (m *Market) Status() { m.mu.Lock() defer m.mu.Unlock() - glog.V(1).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances)) + log.V(2).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances)) for partition, slot := range m.partitions { if slot.AssignedTo == nil { - glog.V(1).Infof("Partition %+v is not assigned to any consumer", partition) + log.V(2).Infof("Partition %+v is not assigned to any consumer", partition) } else { - glog.V(1).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId) + log.V(2).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId) } } for _, consumer := range m.consumerInstances { - glog.V(1).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions)) + log.V(2).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions)) } } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index d1433775a..158551747 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -3,7 +3,7 @@ package topic import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" @@ -82,7 +82,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M for { processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) if readPersistedLogErr != nil { - glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) + log.V(3).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr } if isDone { @@ -104,7 +104,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M continue } if readInMemoryLogErr != nil { - glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) + log.V(3).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) return readInMemoryLogErr } } @@ -179,10 +179,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Canceled { - glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower) + log.V(3).Infof("local partition %v follower %v stopped", p.Partition, p.Follower) return } - glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err) + log.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err) return } atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) @@ -206,9 +206,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, }, }); followErr != nil { - glog.Errorf("Error closing follower stream: %v", followErr) + log.Errorf("Error closing follower stream: %v", followErr) } - glog.V(4).Infof("closing grpcConnection to follower") + log.V(-1).Infof("closing grpcConnection to follower") p.followerGrpcConnection.Close() p.publishFolloweMeStream = nil p.Follower = "" @@ -217,7 +217,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { hasShutdown = true } - glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown) + log.V(3).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown) return } @@ -225,7 +225,7 @@ func (p *LocalPartition) Shutdown() { p.closePublishers() p.closeSubscribers() p.LogBuffer.ShutdownLogBuffer() - glog.V(0).Infof("local partition %v shutting down", p.Partition) + log.V(3).Infof("local partition %v shutting down", p.Partition) } func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { @@ -237,7 +237,7 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { }, }, }); followErr != nil { - glog.Errorf("send follower %s flush message: %v", p.Follower, followErr) + log.Errorf("send follower %s flush message: %v", p.Follower, followErr) } // println("notifying", p.Follower, "flushed at", flushTsNs) } |
