diff options
Diffstat (limited to 'weed/mq/broker')
| -rw-r--r-- | weed/mq/broker/broker_connect.go | 8 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_assign.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_configure.go | 10 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_lookup.go | 4 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 20 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub_balancer.go | 2 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub_follow.go | 22 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 32 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_coordinator.go | 18 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_follow.go | 12 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_topic_conf_read_write.go | 8 | ||||
| -rw-r--r-- | weed/mq/broker/broker_topic_partition_read_write.go | 6 |
13 files changed, 77 insertions, 77 deletions
diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go index 386d86570..8018cfe57 100644 --- a/weed/mq/broker/broker_connect.go +++ b/weed/mq/broker/broker_connect.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" @@ -16,7 +16,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop self := string(b.option.BrokerAddress()) - glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer) + log.V(3).Infof("broker %s connects to balancer %s", self, brokerBalancer) if brokerBalancer == "" { return fmt.Errorf("no balancer found") } @@ -59,7 +59,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop } return fmt.Errorf("send stats message to balancer %s: %v", brokerBalancer, err) } - // glog.V(3).Infof("sent stats: %+v", stats) + // log.V(0).Infof("sent stats: %+v", stats) time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond) } @@ -82,7 +82,7 @@ func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh c for { err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan) if err != nil { - glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err) + log.V(3).Infof("connect to balancer %s: %v", newBrokerBalancer, err) time.Sleep(time.Second) } else { break diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 991208a72..989cccaef 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -50,7 +50,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } } - glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) + log.V(3).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments) return ret, nil } @@ -91,7 +91,7 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) return nil }); doCreateErr != nil { - glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) + log.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr) } }(bpa) } diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index f827f0b37..ea781807f 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -36,7 +36,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. var readErr, assignErr error resp, readErr = b.fca.ReadTopicConfFromFiler(t) if readErr != nil { - glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr) + log.V(3).Infof("read topic %s conf: %v", request.Topic, readErr) } if resp != nil { @@ -47,13 +47,13 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) { - glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) + log.V(3).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments) return } if resp != nil && len(resp.BrokerPartitionAssignments) > 0 { if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil { - glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) + log.V(2).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr) } } resp = &mq_pb.ConfigureTopicResponse{} @@ -70,7 +70,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments) - glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) + log.V(3).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments) return resp, err } diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index 65a1ffda8..07cecccd6 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -28,7 +28,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq conf := &mq_pb.ConfigureTopicResponse{} ret.Topic = request.Topic if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil { - glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err) + log.V(3).Infof("lookup topic %s conf: %v", request.Topic, err) } else { err = b.ensureTopicActiveAssignments(t, conf) ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index f31dc7eff..64841a1b9 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -3,7 +3,7 @@ package broker import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" @@ -46,7 +46,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis initMessage := req.GetInit() if initMessage == nil { response.Error = fmt.Sprintf("missing init message") - glog.Errorf("missing init message") + log.Errorf("missing init message") return stream.Send(response) } @@ -55,14 +55,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p) if getOrGenErr != nil { response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr) - glog.Errorf("topic %v not found: %v", t, getOrGenErr) + log.Errorf("topic %v not found: %v", t, getOrGenErr) return stream.Send(response) } // connect to follower brokers if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil { response.Error = followerErr.Error() - glog.Errorf("MaybeConnectToFollowers: %v", followerErr) + log.Errorf("MaybeConnectToFollowers: %v", followerErr) return stream.Send(response) } @@ -88,7 +88,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis AckSequence: acknowledgedSequence, } if err := stream.Send(response); err != nil { - glog.Errorf("Error sending response %v: %v", response, err) + log.Errorf("Error sending response %v: %v", response, err) } // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) lastAckTime = time.Now() @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis localTopicPartition.Publishers.RemovePublisher(clientName) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveLocalPartition(t, p) - glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) + log.V(3).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition) } }() @@ -126,7 +126,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) + log.V(3).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) break } @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } } - glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) + log.V(3).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } @@ -155,11 +155,11 @@ func findClientAddress(ctx context.Context) string { // fmt.Printf("FromContext %+v\n", ctx) pr, ok := peer.FromContext(ctx) if !ok { - glog.Error("failed to get peer from ctx") + log.Error("failed to get peer from ctx") return "" } if pr.Addr == net.Addr(nil) { - glog.Error("failed to get peer address") + log.Error("failed to get peer address") return "" } return pr.Addr.String() diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go index 5978d2173..cc48ce604 100644 --- a/weed/mq/broker/broker_grpc_pub_balancer.go +++ b/weed/mq/broker/broker_grpc_pub_balancer.go @@ -41,7 +41,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin } if receivedStats := req.GetStats(); receivedStats != nil { b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats) - // glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats) + // log.V(-1).Infof("received from %v: %+v", initMessage.Broker, receivedStats) } } diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 291f1ef62..367cd12ef 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -2,7 +2,7 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" @@ -43,7 +43,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi err = nil break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + log.V(3).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) break } @@ -58,14 +58,14 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi if err := stream.Send(&mq_pb.PublishFollowMeResponse{ AckTsNs: dataMessage.TsNs, }); err != nil { - glog.Errorf("Error sending response %v: %v", dataMessage, err) + log.Errorf("Error sending response %v: %v", dataMessage, err) } // println("ack", string(dataMessage.Key), dataMessage.TsNs) } else if closeMessage := req.GetClose(); closeMessage != nil { - glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + log.V(3).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) break } else if flushMessage := req.GetFlush(); flushMessage != nil { - glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) + log.V(3).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage) lastFlushTsNs = flushMessage.TsNs @@ -80,7 +80,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi } } else { - glog.Errorf("unknown message: %v", req) + log.Errorf("unknown message: %v", req) } } @@ -104,7 +104,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC() if stopTime.UnixNano() <= lastFlushTsNs { - glog.V(0).Infof("dropping remaining data at %v %v", t, p) + log.V(3).Infof("dropping remaining data at %v %v", t, p) continue } @@ -114,17 +114,17 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi for { if err := b.appendToFile(targetFile, mem.buf); err != nil { - glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + log.V(3).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { break } } - glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf)) + log.V(3).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf)) } - glog.V(0).Infof("shut down follower for %v %v", t, p) + log.V(3).Infof("shut down follower for %v %v", t, p) return err } @@ -140,7 +140,7 @@ func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_qu startTime: startTime, stopTime: stopTime, }) - glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf)) + log.V(3).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf)) }, nil, func() { }) return lb diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 9cdbe8325..ba8efb5e4 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -23,7 +23,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs return err } if req.GetInit() == nil { - glog.Errorf("missing init message") + log.Errorf("missing init message") return fmt.Errorf("missing init message") } @@ -33,7 +33,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) - glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + log.V(3).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition) if getOrGenErr != nil { @@ -41,7 +41,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) - glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) + log.V(3).Infof("Subscriber %s connected on %v %v", clientName, t, partition) isConnected := true sleepIntervalCount := 0 @@ -49,7 +49,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs defer func() { isConnected = false localTopicPartition.Subscribers.RemoveSubscriber(clientName) - glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) + log.V(3).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter) if localTopicPartition.MaybeShutdownLocalPartition() { b.localTopicManager.RemoveLocalPartition(t, partition) } @@ -60,7 +60,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // connect to the follower var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient - glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker) + log.V(3).Infof("follower broker: %v", req.GetInit().FollowerBroker) if req.GetInit().FollowerBroker != "" { follower := req.GetInit().FollowerBroker if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil { @@ -90,7 +90,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } } - glog.V(0).Infof("follower %s connected", follower) + log.V(3).Infof("follower %s connected", follower) } go func() { @@ -107,7 +107,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }}) break } - glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) + log.V(3).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err) break } if ack.GetAck().Key == nil { @@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }, }, }); err != nil { - glog.Errorf("Error sending ack to follower: %v", err) + log.Errorf("Error sending ack to follower: %v", err) break } lastOffset = currentLastOffset @@ -133,9 +133,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } if lastOffset > 0 { - glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) + log.V(3).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil { - glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) + log.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) } } if subscribeFollowMeStream != nil { @@ -145,7 +145,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs }, }); err != nil { if err != io.EOF { - glog.Errorf("Error sending close to follower: %v", err) + log.Errorf("Error sending close to follower: %v", err) } } } @@ -169,7 +169,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Client disconnected return false } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err) return false default: // Continue processing the request @@ -190,7 +190,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // Client disconnected return false, nil } - glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err) return false, nil default: // Continue processing the request @@ -207,7 +207,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs TsNs: logEntry.TsNs, }, }}); err != nil { - glog.Errorf("Error sending data: %v", err) + log.Errorf("Error sending data: %v", err) return false, err } @@ -241,7 +241,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess // try to resume if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { - glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) + log.V(3).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) startPosition = log_buffer.NewMessagePosition(storedOffset, -2) return } diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go index 985b0a47e..6dffa606d 100644 --- a/weed/mq/broker/broker_grpc_sub_coordinator.go +++ b/weed/mq/broker/broker_grpc_sub_coordinator.go @@ -3,7 +3,7 @@ package broker import ( "context" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/codes" @@ -29,13 +29,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess if err != nil { return status.Errorf(codes.InvalidArgument, "failed to add subscriber: %v", err) } - glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) + log.V(3).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic) } else { return status.Errorf(codes.InvalidArgument, "subscriber init message is empty") } defer func() { b.SubCoordinator.RemoveSubscriber(initMessage) - glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) }() ctx := stream.Context() @@ -45,15 +45,15 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess for { req, err := stream.Recv() if err != nil { - glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) } if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil { - glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment) + log.V(3).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment) cg.AckUnAssignment(cgi, ackUnAssignment) } if ackAssignment := req.GetAckAssignment(); ackAssignment != nil { - glog.V(0).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment) + log.V(3).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment) cg.AckAssignment(cgi, ackAssignment) } @@ -80,12 +80,12 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess // Client disconnected return err } - glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) return err case message := <-cgi.ResponseChan: - glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message) + log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message) if err := stream.Send(message); err != nil { - glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) + log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err) } } } diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index bed906c30..39344d4a6 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -3,7 +3,7 @@ package broker import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -34,7 +34,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub err = nil break } - glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err) + log.V(3).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err) break } @@ -43,10 +43,10 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub lastOffset = ackMessage.TsNs // println("sub follower got offset", lastOffset) } else if closeMessage := req.GetClose(); closeMessage != nil { - glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) + log.V(3).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) return nil } else { - glog.Errorf("unknown message: %v", req) + log.Errorf("unknown message: %v", req) } } @@ -56,7 +56,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset) } - glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset) + log.V(3).Infof("shut down follower for %v offset %d", initMessage, lastOffset) return err } @@ -90,7 +90,7 @@ func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Part util.Uint64toBytes(offsetBytes, uint64(offset)) return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset) + log.V(3).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset) return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes) }) } diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index d80fa91a4..ef1d6ec75 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -3,7 +3,7 @@ package broker import ( "context" "github.com/seaweedfs/seaweedfs/weed/filer_client" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -89,12 +89,12 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial time.Sleep(time.Millisecond * 237) } self := option.BrokerAddress() - glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler) + log.V(3).Infof("broker %s found filer %s", self, mqBroker.currentFiler) newBrokerBalancerCh := make(chan string, 1) lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { - glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner) + log.V(3).Infof("broker %s found balanacer %s", self, newLockOwner) newBrokerBalancerCh <- newLockOwner }) mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index 222ff16ba..cc1cbce19 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -2,7 +2,7 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -13,12 +13,12 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio // get or generate a local partition conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) if readConfErr != nil { - glog.Errorf("topic %v not found: %v", t, readConfErr) + log.Errorf("topic %v not found: %v", t, readConfErr) return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) } localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) if getOrGenError != nil { - glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + log.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) } return localTopicPartition, nil @@ -55,7 +55,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m // also fix assignee broker if invalid hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments) if hasChanges { - glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) + log.V(3).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments) if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil { return err } diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go index d6513b2a2..94ae45b73 100644 --- a/weed/mq/broker/broker_topic_partition_read_write.go +++ b/weed/mq/broker/broker_topic_partition_read_write.go @@ -2,7 +2,7 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "sync/atomic" @@ -25,7 +25,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l for { if err := b.appendToFile(targetFile, buf); err != nil { - glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err) + log.V(3).Infof("metadata log write failed %s: %v", targetFile, err) time.Sleep(737 * time.Millisecond) } else { break @@ -40,6 +40,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs) } - glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) + log.V(3).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf)) } } |
