aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
committerchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
commit0d62be44846354c3c37b857028297edd4b8df17b (patch)
treec89320a7d58351030f1b740c7267f56bf0206429 /weed/mq
parentd8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff)
downloadseaweedfs-origin/changing-to-zap.tar.xz
seaweedfs-origin/changing-to-zap.zip
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/agent/agent_grpc_subscribe.go10
-rw-r--r--weed/mq/broker/broker_connect.go8
-rw-r--r--weed/mq/broker/broker_grpc_assign.go6
-rw-r--r--weed/mq/broker/broker_grpc_configure.go10
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go4
-rw-r--r--weed/mq/broker/broker_grpc_pub.go20
-rw-r--r--weed/mq/broker/broker_grpc_pub_balancer.go2
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go22
-rw-r--r--weed/mq/broker/broker_grpc_sub.go32
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go18
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go12
-rw-r--r--weed/mq/broker/broker_server.go6
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go8
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go6
-rw-r--r--weed/mq/client/pub_client/scheduler.go53
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go20
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go14
-rw-r--r--weed/mq/client/sub_client/subscribe.go10
-rw-r--r--weed/mq/logstore/log_to_parquet.go2
-rw-r--r--weed/mq/logstore/merged_read.go6
-rw-r--r--weed/mq/logstore/read_log_from_disk.go4
-rw-r--r--weed/mq/pub_balancer/allocate.go8
-rw-r--r--weed/mq/pub_balancer/partition_list_broker.go6
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go10
-rw-r--r--weed/mq/sub_coordinator/market.go24
-rw-r--r--weed/mq/topic/local_partition.go20
26 files changed, 174 insertions, 167 deletions
diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go
index 87baa466c..ac7cc1c6f 100644
--- a/weed/mq/agent/agent_grpc_subscribe.go
+++ b/weed/mq/agent/agent_grpc_subscribe.go
@@ -2,7 +2,7 @@ package agent
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
@@ -31,7 +31,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA
record := &schema_pb.RecordValue{}
err := proto.Unmarshal(m.Data.Value, record)
if err != nil {
- glog.V(0).Infof("unmarshal record value: %v", err)
+ log.V(3).Infof("unmarshal record value: %v", err)
if lastErr == nil {
lastErr = err
}
@@ -42,7 +42,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA
Value: record,
TsNs: m.Data.TsNs,
}); sendErr != nil {
- glog.V(0).Infof("send record: %v", sendErr)
+ log.V(3).Infof("send record: %v", sendErr)
if lastErr == nil {
lastErr = sendErr
}
@@ -53,7 +53,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA
go func() {
subErr := subscriber.Subscribe()
if subErr != nil {
- glog.V(0).Infof("subscriber %s subscribe: %v", subscriber.SubscriberConfig.String(), subErr)
+ log.V(3).Infof("subscriber %s subscribe: %v", subscriber.SubscriberConfig.String(), subErr)
if lastErr == nil {
lastErr = subErr
}
@@ -63,7 +63,7 @@ func (a *MessageQueueAgent) SubscribeRecord(stream mq_agent_pb.SeaweedMessagingA
for {
m, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("subscriber %s receive: %v", subscriber.SubscriberConfig.String(), err)
+ log.V(3).Infof("subscriber %s receive: %v", subscriber.SubscriberConfig.String(), err)
return err
}
if m != nil {
diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go
index 386d86570..8018cfe57 100644
--- a/weed/mq/broker/broker_connect.go
+++ b/weed/mq/broker/broker_connect.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
@@ -16,7 +16,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop
self := string(b.option.BrokerAddress())
- glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer)
+ log.V(3).Infof("broker %s connects to balancer %s", self, brokerBalancer)
if brokerBalancer == "" {
return fmt.Errorf("no balancer found")
}
@@ -59,7 +59,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop
}
return fmt.Errorf("send stats message to balancer %s: %v", brokerBalancer, err)
}
- // glog.V(3).Infof("sent stats: %+v", stats)
+ // log.V(0).Infof("sent stats: %+v", stats)
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
@@ -82,7 +82,7 @@ func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh c
for {
err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan)
if err != nil {
- glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err)
+ log.V(3).Infof("connect to balancer %s: %v", newBrokerBalancer, err)
time.Sleep(time.Second)
} else {
break
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 991208a72..989cccaef 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -50,7 +50,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
}
}
- glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
+ log.V(3).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil
}
@@ -91,7 +91,7 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context,
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
return nil
}); doCreateErr != nil {
- glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
+ log.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
}
}(bpa)
}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index f827f0b37..ea781807f 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -4,7 +4,7 @@ import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -36,7 +36,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
var readErr, assignErr error
resp, readErr = b.fca.ReadTopicConfFromFiler(t)
if readErr != nil {
- glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr)
+ log.V(3).Infof("read topic %s conf: %v", request.Topic, readErr)
}
if resp != nil {
@@ -47,13 +47,13 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
- glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
+ log.V(3).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
return
}
if resp != nil && len(resp.BrokerPartitionAssignments) > 0 {
if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
- glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
+ log.V(2).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
}
}
resp = &mq_pb.ConfigureTopicResponse{}
@@ -70,7 +70,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
- glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
+ log.V(3).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
return resp, err
}
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 65a1ffda8..07cecccd6 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -28,7 +28,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
conf := &mq_pb.ConfigureTopicResponse{}
ret.Topic = request.Topic
if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
- glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
+ log.V(3).Infof("lookup topic %s conf: %v", request.Topic, err)
} else {
err = b.ensureTopicActiveAssignments(t, conf)
ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index f31dc7eff..64841a1b9 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
@@ -46,7 +46,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
initMessage := req.GetInit()
if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
- glog.Errorf("missing init message")
+ log.Errorf("missing init message")
return stream.Send(response)
}
@@ -55,14 +55,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
if getOrGenErr != nil {
response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
- glog.Errorf("topic %v not found: %v", t, getOrGenErr)
+ log.Errorf("topic %v not found: %v", t, getOrGenErr)
return stream.Send(response)
}
// connect to follower brokers
if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
response.Error = followerErr.Error()
- glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
+ log.Errorf("MaybeConnectToFollowers: %v", followerErr)
return stream.Send(response)
}
@@ -88,7 +88,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
AckSequence: acknowledgedSequence,
}
if err := stream.Send(response); err != nil {
- glog.Errorf("Error sending response %v: %v", response, err)
+ log.Errorf("Error sending response %v: %v", response, err)
}
// println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
lastAckTime = time.Now()
@@ -107,7 +107,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition.Publishers.RemovePublisher(clientName)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveLocalPartition(t, p)
- glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
+ log.V(3).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
}()
@@ -126,7 +126,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err == io.EOF {
break
}
- glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
+ log.V(3).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
break
}
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
}
- glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
+ log.V(3).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
return nil
}
@@ -155,11 +155,11 @@ func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
- glog.Error("failed to get peer from ctx")
+ log.Error("failed to get peer from ctx")
return ""
}
if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
+ log.Error("failed to get peer address")
return ""
}
return pr.Addr.String()
diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go
index 5978d2173..cc48ce604 100644
--- a/weed/mq/broker/broker_grpc_pub_balancer.go
+++ b/weed/mq/broker/broker_grpc_pub_balancer.go
@@ -41,7 +41,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
}
if receivedStats := req.GetStats(); receivedStats != nil {
b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
- // glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
+ // log.V(-1).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
}
}
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 291f1ef62..367cd12ef 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -2,7 +2,7 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
@@ -43,7 +43,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
err = nil
break
}
- glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ log.V(3).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
break
}
@@ -58,14 +58,14 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
if err := stream.Send(&mq_pb.PublishFollowMeResponse{
AckTsNs: dataMessage.TsNs,
}); err != nil {
- glog.Errorf("Error sending response %v: %v", dataMessage, err)
+ log.Errorf("Error sending response %v: %v", dataMessage, err)
}
// println("ack", string(dataMessage.Key), dataMessage.TsNs)
} else if closeMessage := req.GetClose(); closeMessage != nil {
- glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
+ log.V(3).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
break
} else if flushMessage := req.GetFlush(); flushMessage != nil {
- glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
+ log.V(3).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
lastFlushTsNs = flushMessage.TsNs
@@ -80,7 +80,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
}
} else {
- glog.Errorf("unknown message: %v", req)
+ log.Errorf("unknown message: %v", req)
}
}
@@ -104,7 +104,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
if stopTime.UnixNano() <= lastFlushTsNs {
- glog.V(0).Infof("dropping remaining data at %v %v", t, p)
+ log.V(3).Infof("dropping remaining data at %v %v", t, p)
continue
}
@@ -114,17 +114,17 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
for {
if err := b.appendToFile(targetFile, mem.buf); err != nil {
- glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ log.V(3).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
}
}
- glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
+ log.V(3).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
}
- glog.V(0).Infof("shut down follower for %v %v", t, p)
+ log.V(3).Infof("shut down follower for %v %v", t, p)
return err
}
@@ -140,7 +140,7 @@ func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_qu
startTime: startTime,
stopTime: stopTime,
})
- glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
+ log.V(3).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
}, nil, func() {
})
return lb
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 9cdbe8325..ba8efb5e4 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -23,7 +23,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
return err
}
if req.GetInit() == nil {
- glog.Errorf("missing init message")
+ log.Errorf("missing init message")
return fmt.Errorf("missing init message")
}
@@ -33,7 +33,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
- glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+ log.V(3).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
if getOrGenErr != nil {
@@ -41,7 +41,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
- glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
+ log.V(3).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
sleepIntervalCount := 0
@@ -49,7 +49,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
defer func() {
isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
- glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
+ log.V(3).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveLocalPartition(t, partition)
}
@@ -60,7 +60,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// connect to the follower
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
- glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
+ log.V(3).Infof("follower broker: %v", req.GetInit().FollowerBroker)
if req.GetInit().FollowerBroker != "" {
follower := req.GetInit().FollowerBroker
if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
@@ -90,7 +90,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
}
- glog.V(0).Infof("follower %s connected", follower)
+ log.V(3).Infof("follower %s connected", follower)
}
go func() {
@@ -107,7 +107,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}})
break
}
- glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
+ log.V(3).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
break
}
if ack.GetAck().Key == nil {
@@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
},
},
}); err != nil {
- glog.Errorf("Error sending ack to follower: %v", err)
+ log.Errorf("Error sending ack to follower: %v", err)
break
}
lastOffset = currentLastOffset
@@ -133,9 +133,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
if lastOffset > 0 {
- glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
+ log.V(3).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
- glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
+ log.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
}
}
if subscribeFollowMeStream != nil {
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
},
}); err != nil {
if err != io.EOF {
- glog.Errorf("Error sending close to follower: %v", err)
+ log.Errorf("Error sending close to follower: %v", err)
}
}
}
@@ -169,7 +169,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Client disconnected
return false
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
// Continue processing the request
@@ -190,7 +190,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Client disconnected
return false, nil
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
@@ -207,7 +207,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
TsNs: logEntry.TsNs,
},
}}); err != nil {
- glog.Errorf("Error sending data: %v", err)
+ log.Errorf("Error sending data: %v", err)
return false, err
}
@@ -241,7 +241,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
// try to resume
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
- glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
+ log.V(3).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 985b0a47e..6dffa606d 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
@@ -29,13 +29,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to add subscriber: %v", err)
}
- glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
+ log.V(3).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
}
defer func() {
b.SubCoordinator.RemoveSubscriber(initMessage)
- glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}()
ctx := stream.Context()
@@ -45,15 +45,15 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
for {
req, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}
if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil {
- glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment)
+ log.V(3).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment)
cg.AckUnAssignment(cgi, ackUnAssignment)
}
if ackAssignment := req.GetAckAssignment(); ackAssignment != nil {
- glog.V(0).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment)
+ log.V(3).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment)
cg.AckAssignment(cgi, ackAssignment)
}
@@ -80,12 +80,12 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// Client disconnected
return err
}
- glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
return err
case message := <-cgi.ResponseChan:
- glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message)
+ log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message)
if err := stream.Send(message); err != nil {
- glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}
}
}
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index bed906c30..39344d4a6 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -3,7 +3,7 @@ package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -34,7 +34,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
err = nil
break
}
- glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ log.V(3).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
break
}
@@ -43,10 +43,10 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
lastOffset = ackMessage.TsNs
// println("sub follower got offset", lastOffset)
} else if closeMessage := req.GetClose(); closeMessage != nil {
- glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
+ log.V(3).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
return nil
} else {
- glog.Errorf("unknown message: %v", req)
+ log.Errorf("unknown message: %v", req)
}
}
@@ -56,7 +56,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
}
- glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
+ log.V(3).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
return err
}
@@ -90,7 +90,7 @@ func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Part
util.Uint64toBytes(offsetBytes, uint64(offset))
return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
+ log.V(3).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
})
}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index d80fa91a4..ef1d6ec75 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -89,12 +89,12 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
time.Sleep(time.Millisecond * 237)
}
self := option.BrokerAddress()
- glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
+ log.V(3).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
newBrokerBalancerCh := make(chan string, 1)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
- glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner)
+ log.V(3).Infof("broker %s found balanacer %s", self, newLockOwner)
newBrokerBalancerCh <- newLockOwner
})
mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh)
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 222ff16ba..cc1cbce19 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -2,7 +2,7 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -13,12 +13,12 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio
// get or generate a local partition
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
if readConfErr != nil {
- glog.Errorf("topic %v not found: %v", t, readConfErr)
+ log.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
}
localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
if getOrGenError != nil {
- glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ log.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
}
return localTopicPartition, nil
@@ -55,7 +55,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m
// also fix assignee broker if invalid
hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments)
if hasChanges {
- glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
+ log.V(3).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil {
return err
}
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index d6513b2a2..94ae45b73 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -2,7 +2,7 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync/atomic"
@@ -25,7 +25,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
for {
if err := b.appendToFile(targetFile, buf); err != nil {
- glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ log.V(3).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
@@ -40,6 +40,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
- glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
+ log.V(3).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
}
}
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index a768fa7f8..529efc693 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -3,19 +3,19 @@ package pub_client
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
- "log"
- "sort"
- "sync"
- "sync/atomic"
- "time"
)
type EachPartitionError struct {
@@ -33,27 +33,26 @@ type EachPartitionPublishJob struct {
}
func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
-
if err := p.doConfigureTopic(); err != nil {
wg.Done()
return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
}
- log.Printf("start scheduler thread for topic %s", p.config.Topic)
+ log.Infof("start scheduler thread for topic %s", p.config.Topic)
generation := 0
var errChan chan EachPartitionError
for {
- glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
+ log.V(3).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
if assignments, err := p.doLookupTopicPartitions(); err == nil {
generation++
- glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
+ log.V(3).Infof("start generation %d with %d assignments", generation, len(assignments))
if errChan == nil {
errChan = make(chan EachPartitionError, len(assignments))
}
p.onEachAssignments(generation, assignments, errChan)
} else {
- glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
+ log.Errorf("lookup topic %s: %v", p.config.Topic, err)
time.Sleep(5 * time.Second)
continue
}
@@ -66,7 +65,7 @@ func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
for {
select {
case eachErr := <-errChan:
- glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
+ log.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
if eachErr.generation < generation {
continue
}
@@ -114,7 +113,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
go func(job *EachPartitionPublishJob) {
defer job.wg.Done()
if err := p.doPublishToPartition(job); err != nil {
- log.Printf("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
+ log.Infof("publish to %s partition %v: %v", p.config.Topic, job.Partition, err)
errChan <- EachPartitionError{assignment, err, generation}
}
}(job)
@@ -127,8 +126,7 @@ func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.
}
func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
-
- log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
+ log.Infof("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
grpcConnection, err := grpc.NewClient(job.LeaderBroker, grpc.WithTransportCredentials(insecure.NewCredentials()), p.grpcDialOption)
if err != nil {
@@ -159,10 +157,19 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
// process the hello message
resp, err := stream.Recv()
if err != nil {
- return fmt.Errorf("recv init response: %v", err)
+ e, _ := status.FromError(err)
+ if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Infof("publish to %s EOF", publishClient.Broker)
+ return nil
+ }
+ publishClient.Err = err
+ log.Errorf("publish1 to %s error: %v", publishClient.Broker, err)
+ return err
}
if resp.Error != "" {
- return fmt.Errorf("init response error: %v", resp.Error)
+ publishClient.Err = fmt.Errorf("ack error: %v", resp.Error)
+ log.Errorf("publish2 to %s error: %v", publishClient.Broker, resp.Error)
+ return fmt.Errorf("ack error: %v", resp.Error)
}
var publishedTsNs int64
@@ -176,20 +183,20 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
- log.Printf("publish to %s EOF", publishClient.Broker)
+ log.Infof("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Errorf("publish1 to %s error: %v", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Errorf("publish2 to %s error: %v", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
- log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
+ log.Infof("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
}
if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
return
@@ -222,7 +229,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}
}
- log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
+ log.Infof("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil
}
@@ -272,7 +279,7 @@ func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerP
&mq_pb.LookupTopicBrokersRequest{
Topic: p.config.Topic.ToPbTopic(),
})
- glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
+ log.V(3).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
if err != nil {
return err
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index e88aaca2f..feccca7a4 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -1,7 +1,7 @@
package sub_client
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"time"
@@ -29,17 +29,17 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
return nil
})
if err != nil {
- glog.V(0).Infof("broker coordinator on %s: %v", broker, err)
+ log.V(3).Infof("broker coordinator on %s: %v", broker, err)
continue
}
- glog.V(0).Infof("found broker coordinator: %v", brokerLeader)
+ log.V(3).Infof("found broker coordinator: %v", brokerLeader)
// connect to the balancer
pb.WithBrokerGrpcClient(true, brokerLeader, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
stream, err := client.SubscriberToSubCoordinator(sub.ctx)
if err != nil {
- glog.V(0).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s: %v", sub.ContentConfig.Topic, err)
return err
}
waitTime = 1 * time.Second
@@ -56,7 +56,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
},
},
}); err != nil {
- glog.V(0).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s send init: %v", sub.ContentConfig.Topic, err)
return err
}
@@ -69,9 +69,9 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
default:
}
- glog.V(0).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
+ log.V(3).Infof("subscriber instance %s ack %+v", sub.SubscriberConfig.ConsumerGroupInstanceId, reply)
if err := stream.Send(reply); err != nil {
- glog.V(0).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s reply: %v", sub.ContentConfig.Topic, err)
return
}
}
@@ -81,7 +81,7 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
for {
resp, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
+ log.V(3).Infof("subscriber %s receive: %v", sub.ContentConfig.Topic, err)
return err
}
@@ -92,13 +92,13 @@ func (sub *TopicSubscriber) doKeepConnectedToSubCoordinator() {
}
sub.brokerPartitionAssignmentChan <- resp
- glog.V(0).Infof("Received assignment: %+v", resp)
+ log.V(3).Infof("Received assignment: %+v", resp)
}
return nil
})
}
- glog.V(0).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ log.V(3).Infof("subscriber %s/%s waiting for more assignments", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
if waitTime < 10*time.Second {
waitTime += 1 * time.Second
}
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 14a38cfa8..a931eb71f 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -52,10 +52,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
},
},
}); err != nil {
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
+ log.V(3).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
}
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
@@ -88,7 +88,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}()
for {
- // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ // log.V(3).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
resp, err := subscribeClient.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
@@ -97,7 +97,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
- glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
+ log.V(3).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
@@ -112,7 +112,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
if m.Data.Ctrl != nil {
- glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
+ log.V(1).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
continue
}
if len(m.Data.Key) == 0 {
@@ -121,7 +121,7 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
onDataMessageFn(m)
case *mq_pb.SubscribeMessageResponse_Ctrl:
- // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
+ // log.V(3).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return io.EOF
}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index d4dea3852..1d06e0601 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,7 +1,7 @@
package sub_client
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -60,7 +60,7 @@ func (sub *TopicSubscriber) startProcessors() {
<-semaphore
wg.Done()
}()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
@@ -84,9 +84,9 @@ func (sub *TopicSubscriber) startProcessors() {
err := sub.onEachPartition(assigned, stopChan, onDataMessageFn)
if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
+ log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
}
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
@@ -130,7 +130,7 @@ func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartiti
}
sub.activeProcessorsLock.Unlock()
if foundOverlapping {
- glog.V(0).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
+ log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
time.Sleep(1 * time.Second)
}
}
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go
index 30cad8cc1..c16990c5e 100644
--- a/weed/mq/logstore/log_to_parquet.go
+++ b/weed/mq/logstore/log_to_parquet.go
@@ -160,7 +160,7 @@ func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, time
}
logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
if err != nil {
- // glog.Warningf("parse log time %s: %v", entry.Name, err)
+ // log.Warningf("parse log time %s: %v", entry.Name, err)
return nil
}
if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go
index 03a47ace4..ea8868a70 100644
--- a/weed/mq/logstore/merged_read.go
+++ b/weed/mq/logstore/merged_read.go
@@ -17,9 +17,9 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun
var lastProcessedPosition log_buffer.MessagePosition
return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
if !exhaustedParquet {
- // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
+ // log.V(-1).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
- // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
+ // log.V(-1).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
if isDone {
isDone = false
}
@@ -34,7 +34,7 @@ func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFun
startPosition = lastProcessedPosition
}
- // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
+ // log.V(-1).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
return
}
diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go
index 71ba58c1f..00e412b40 100644
--- a/weed/mq/logstore/read_log_from_disk.go
+++ b/weed/mq/logstore/read_log_from_disk.go
@@ -3,7 +3,7 @@ package logstore
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -71,7 +71,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
continue
}
if chunk.IsChunkManifest {
- glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
+ log.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
return
}
urlStrings, err = lookupFileIdFn(chunk.FileId)
diff --git a/weed/mq/pub_balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 46d423b30..94398d6c7 100644
--- a/weed/mq/pub_balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -2,7 +2,7 @@ package pub_balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"math/rand"
@@ -30,7 +30,7 @@ func AllocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
EnsureAssignmentsToActiveBrokers(brokers, 1, assignments)
- glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
+ log.V(3).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
return
}
@@ -78,7 +78,7 @@ func pickBrokersExcluded(brokers []string, count int, excludedLeadBroker string,
// EnsureAssignmentsToActiveBrokers ensures the assignments are assigned to active brokers
func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *BrokerStats], followerCount int, assignments []*mq_pb.BrokerPartitionAssignment) (hasChanges bool) {
- glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
+ log.V(3).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v", activeBrokers.Count(), followerCount, assignments)
candidates := make([]string, 0, activeBrokers.Count())
for brokerStatsItem := range activeBrokers.IterBuffered() {
@@ -122,6 +122,6 @@ func EnsureAssignmentsToActiveBrokers(activeBrokers cmap.ConcurrentMap[string, *
}
- glog.V(0).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
+ log.V(3).Infof("EnsureAssignmentsToActiveBrokers: activeBrokers: %v, followerCount: %d, assignments: %v hasChanges: %v", activeBrokers.Count(), followerCount, assignments, hasChanges)
return
}
diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go
index 34bdfd286..168f7f83d 100644
--- a/weed/mq/pub_balancer/partition_list_broker.go
+++ b/weed/mq/pub_balancer/partition_list_broker.go
@@ -1,7 +1,7 @@
package pub_balancer
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
@@ -28,11 +28,11 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *schema_pb.Partition, b
for _, partitionSlot := range ps.PartitionSlots {
if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
if partitionSlot.AssignedBroker != "" && partitionSlot.AssignedBroker != broker {
- glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
+ log.V(3).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
partitionSlot.AssignedBroker = broker
}
if partitionSlot.FollowerBroker != "" && partitionSlot.FollowerBroker != follower {
- glog.V(0).Infof("partition %s follower change: %s => %s", partition, partitionSlot.FollowerBroker, follower)
+ log.V(3).Infof("partition %s follower change: %s => %s", partition, partitionSlot.FollowerBroker, follower)
partitionSlot.FollowerBroker = follower
}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index ba94f34b4..afe400989 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -4,7 +4,7 @@ import (
"fmt"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
@@ -35,7 +35,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce
}
cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
} else {
- glog.V(0).Infof("fail to read topic conf from filer: %v", err)
+ log.V(3).Infof("fail to read topic conf from filer: %v", err)
return nil
}
@@ -45,7 +45,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce
case adjustment := <-cg.Market.AdjustmentChan:
cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
if !found {
- glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
+ log.V(3).Infof("consumer group instance %s not found", adjustment.consumer)
continue
}
if adjustment.isAssign {
@@ -63,7 +63,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce
},
},
}
- glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
+ log.V(3).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
break
}
}
@@ -76,7 +76,7 @@ func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAcce
},
},
}
- glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
+ log.V(3).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
}
case <-cg.stopCh:
return
diff --git a/weed/mq/sub_coordinator/market.go b/weed/mq/sub_coordinator/market.go
index df07edfd5..7473248f7 100644
--- a/weed/mq/sub_coordinator/market.go
+++ b/weed/mq/sub_coordinator/market.go
@@ -2,7 +2,7 @@ package sub_coordinator
import (
"errors"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"sync"
"time"
@@ -290,7 +290,7 @@ func (m *Market) ConfirmAdjustment(adjustment *Adjustment) {
} else {
m.unassignPartitionSlot(adjustment.partition)
}
- glog.V(1).Infof("ConfirmAdjustment %+v", adjustment)
+ log.V(2).Infof("ConfirmAdjustment %+v", adjustment)
m.Status()
}
@@ -300,12 +300,12 @@ func (m *Market) unassignPartitionSlot(partition topic.Partition) {
partitionSlot, exists := m.partitions[partition]
if !exists {
- glog.V(0).Infof("partition %+v slot is not tracked", partition)
+ log.V(3).Infof("partition %+v slot is not tracked", partition)
return
}
if partitionSlot.AssignedTo == nil {
- glog.V(0).Infof("partition %+v slot is not assigned to any consumer", partition)
+ log.V(3).Infof("partition %+v slot is not assigned to any consumer", partition)
return
}
@@ -319,7 +319,7 @@ func (m *Market) unassignPartitionSlot(partition topic.Partition) {
}
}
- glog.V(0).Infof("partition %+v slot not found in assigned consumer", partition)
+ log.V(3).Infof("partition %+v slot not found in assigned consumer", partition)
}
@@ -329,18 +329,18 @@ func (m *Market) confirmAssignPartition(partition topic.Partition, consumerInsta
partitionSlot, exists := m.partitions[partition]
if !exists {
- glog.V(0).Infof("partition %+v slot is not tracked", partition)
+ log.V(3).Infof("partition %+v slot is not tracked", partition)
return
}
if partitionSlot.AssignedTo != nil {
- glog.V(0).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId)
+ log.V(3).Infof("partition %+v slot is already assigned to %+v", partition, partitionSlot.AssignedTo.InstanceId)
return
}
consumerInstance, exists := m.consumerInstances[consumerInstanceId]
if !exists {
- glog.V(0).Infof("consumer %+v is not tracked", consumerInstanceId)
+ log.V(3).Infof("consumer %+v is not tracked", consumerInstanceId)
return
}
@@ -353,15 +353,15 @@ func (m *Market) Status() {
m.mu.Lock()
defer m.mu.Unlock()
- glog.V(1).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances))
+ log.V(2).Infof("Market has %d partitions and %d consumer instances", len(m.partitions), len(m.consumerInstances))
for partition, slot := range m.partitions {
if slot.AssignedTo == nil {
- glog.V(1).Infof("Partition %+v is not assigned to any consumer", partition)
+ log.V(2).Infof("Partition %+v is not assigned to any consumer", partition)
} else {
- glog.V(1).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId)
+ log.V(2).Infof("Partition %+v is assigned to consumer %+v", partition, slot.AssignedTo.InstanceId)
}
}
for _, consumer := range m.consumerInstances {
- glog.V(1).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions))
+ log.V(2).Infof("Consumer %+v has %d partitions", consumer.InstanceId, len(consumer.AssignedPartitions))
}
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index d1433775a..158551747 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -3,7 +3,7 @@ package topic
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
@@ -82,7 +82,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
for {
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
if readPersistedLogErr != nil {
- glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
+ log.V(3).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
}
if isDone {
@@ -104,7 +104,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
continue
}
if readInMemoryLogErr != nil {
- glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
+ log.V(3).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
return readInMemoryLogErr
}
}
@@ -179,10 +179,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Canceled {
- glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
+ log.V(3).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
return
}
- glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
+ log.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
return
}
atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
@@ -206,9 +206,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
}); followErr != nil {
- glog.Errorf("Error closing follower stream: %v", followErr)
+ log.Errorf("Error closing follower stream: %v", followErr)
}
- glog.V(4).Infof("closing grpcConnection to follower")
+ log.V(-1).Infof("closing grpcConnection to follower")
p.followerGrpcConnection.Close()
p.publishFolloweMeStream = nil
p.Follower = ""
@@ -217,7 +217,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
hasShutdown = true
}
- glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
+ log.V(3).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
return
}
@@ -225,7 +225,7 @@ func (p *LocalPartition) Shutdown() {
p.closePublishers()
p.closeSubscribers()
p.LogBuffer.ShutdownLogBuffer()
- glog.V(0).Infof("local partition %v shutting down", p.Partition)
+ log.V(3).Infof("local partition %v shutting down", p.Partition)
}
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
@@ -237,7 +237,7 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
},
},
}); followErr != nil {
- glog.Errorf("send follower %s flush message: %v", p.Follower, followErr)
+ log.Errorf("send follower %s flush message: %v", p.Follower, followErr)
}
// println("notifying", p.Follower, "flushed at", flushTsNs)
}