aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker')
-rw-r--r--weed/mq/broker/broker_connect.go8
-rw-r--r--weed/mq/broker/broker_grpc_assign.go6
-rw-r--r--weed/mq/broker/broker_grpc_configure.go10
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go4
-rw-r--r--weed/mq/broker/broker_grpc_pub.go20
-rw-r--r--weed/mq/broker/broker_grpc_pub_balancer.go2
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go22
-rw-r--r--weed/mq/broker/broker_grpc_sub.go32
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go18
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go12
-rw-r--r--weed/mq/broker/broker_server.go6
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go8
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go6
13 files changed, 77 insertions, 77 deletions
diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go
index 386d86570..8018cfe57 100644
--- a/weed/mq/broker/broker_connect.go
+++ b/weed/mq/broker/broker_connect.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
@@ -16,7 +16,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop
self := string(b.option.BrokerAddress())
- glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer)
+ log.V(3).Infof("broker %s connects to balancer %s", self, brokerBalancer)
if brokerBalancer == "" {
return fmt.Errorf("no balancer found")
}
@@ -59,7 +59,7 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stop
}
return fmt.Errorf("send stats message to balancer %s: %v", brokerBalancer, err)
}
- // glog.V(3).Infof("sent stats: %+v", stats)
+ // log.V(0).Infof("sent stats: %+v", stats)
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
@@ -82,7 +82,7 @@ func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh c
for {
err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan)
if err != nil {
- glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err)
+ log.V(3).Infof("connect to balancer %s: %v", newBrokerBalancer, err)
time.Sleep(time.Second)
} else {
break
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 991208a72..989cccaef 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -50,7 +50,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
}
}
- glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
+ log.V(3).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil
}
@@ -91,7 +91,7 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context,
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
return nil
}); doCreateErr != nil {
- glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
+ log.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
}
}(bpa)
}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index f827f0b37..ea781807f 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -4,7 +4,7 @@ import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -36,7 +36,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
var readErr, assignErr error
resp, readErr = b.fca.ReadTopicConfFromFiler(t)
if readErr != nil {
- glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr)
+ log.V(3).Infof("read topic %s conf: %v", request.Topic, readErr)
}
if resp != nil {
@@ -47,13 +47,13 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
- glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
+ log.V(3).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
return
}
if resp != nil && len(resp.BrokerPartitionAssignments) > 0 {
if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
- glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
+ log.V(2).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
}
}
resp = &mq_pb.ConfigureTopicResponse{}
@@ -70,7 +70,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
- glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
+ log.V(3).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
return resp, err
}
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 65a1ffda8..07cecccd6 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -28,7 +28,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
conf := &mq_pb.ConfigureTopicResponse{}
ret.Topic = request.Topic
if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
- glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
+ log.V(3).Infof("lookup topic %s conf: %v", request.Topic, err)
} else {
err = b.ensureTopicActiveAssignments(t, conf)
ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index f31dc7eff..64841a1b9 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
@@ -46,7 +46,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
initMessage := req.GetInit()
if initMessage == nil {
response.Error = fmt.Sprintf("missing init message")
- glog.Errorf("missing init message")
+ log.Errorf("missing init message")
return stream.Send(response)
}
@@ -55,14 +55,14 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
if getOrGenErr != nil {
response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
- glog.Errorf("topic %v not found: %v", t, getOrGenErr)
+ log.Errorf("topic %v not found: %v", t, getOrGenErr)
return stream.Send(response)
}
// connect to follower brokers
if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
response.Error = followerErr.Error()
- glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
+ log.Errorf("MaybeConnectToFollowers: %v", followerErr)
return stream.Send(response)
}
@@ -88,7 +88,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
AckSequence: acknowledgedSequence,
}
if err := stream.Send(response); err != nil {
- glog.Errorf("Error sending response %v: %v", response, err)
+ log.Errorf("Error sending response %v: %v", response, err)
}
// println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
lastAckTime = time.Now()
@@ -107,7 +107,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
localTopicPartition.Publishers.RemovePublisher(clientName)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveLocalPartition(t, p)
- glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
+ log.V(3).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
}()
@@ -126,7 +126,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err == io.EOF {
break
}
- glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
+ log.V(3).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
break
}
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
}
- glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
+ log.V(3).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
return nil
}
@@ -155,11 +155,11 @@ func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx)
pr, ok := peer.FromContext(ctx)
if !ok {
- glog.Error("failed to get peer from ctx")
+ log.Error("failed to get peer from ctx")
return ""
}
if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
+ log.Error("failed to get peer address")
return ""
}
return pr.Addr.String()
diff --git a/weed/mq/broker/broker_grpc_pub_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go
index 5978d2173..cc48ce604 100644
--- a/weed/mq/broker/broker_grpc_pub_balancer.go
+++ b/weed/mq/broker/broker_grpc_pub_balancer.go
@@ -41,7 +41,7 @@ func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessagin
}
if receivedStats := req.GetStats(); receivedStats != nil {
b.PubBalancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
- // glog.V(4).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
+ // log.V(-1).Infof("received from %v: %+v", initMessage.Broker, receivedStats)
}
}
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 291f1ef62..367cd12ef 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -2,7 +2,7 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
@@ -43,7 +43,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
err = nil
break
}
- glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ log.V(3).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
break
}
@@ -58,14 +58,14 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
if err := stream.Send(&mq_pb.PublishFollowMeResponse{
AckTsNs: dataMessage.TsNs,
}); err != nil {
- glog.Errorf("Error sending response %v: %v", dataMessage, err)
+ log.Errorf("Error sending response %v: %v", dataMessage, err)
}
// println("ack", string(dataMessage.Key), dataMessage.TsNs)
} else if closeMessage := req.GetClose(); closeMessage != nil {
- glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
+ log.V(3).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
break
} else if flushMessage := req.GetFlush(); flushMessage != nil {
- glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
+ log.V(3).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
lastFlushTsNs = flushMessage.TsNs
@@ -80,7 +80,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
}
} else {
- glog.Errorf("unknown message: %v", req)
+ log.Errorf("unknown message: %v", req)
}
}
@@ -104,7 +104,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
if stopTime.UnixNano() <= lastFlushTsNs {
- glog.V(0).Infof("dropping remaining data at %v %v", t, p)
+ log.V(3).Infof("dropping remaining data at %v %v", t, p)
continue
}
@@ -114,17 +114,17 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
for {
if err := b.appendToFile(targetFile, mem.buf); err != nil {
- glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ log.V(3).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
}
}
- glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
+ log.V(3).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
}
- glog.V(0).Infof("shut down follower for %v %v", t, p)
+ log.V(3).Infof("shut down follower for %v %v", t, p)
return err
}
@@ -140,7 +140,7 @@ func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_qu
startTime: startTime,
stopTime: stopTime,
})
- glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
+ log.V(3).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
}, nil, func() {
})
return lb
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 9cdbe8325..ba8efb5e4 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -23,7 +23,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
return err
}
if req.GetInit() == nil {
- glog.Errorf("missing init message")
+ log.Errorf("missing init message")
return fmt.Errorf("missing init message")
}
@@ -33,7 +33,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
- glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+ log.V(3).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
if getOrGenErr != nil {
@@ -41,7 +41,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
- glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
+ log.V(3).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
sleepIntervalCount := 0
@@ -49,7 +49,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
defer func() {
isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
- glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
+ log.V(3).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
if localTopicPartition.MaybeShutdownLocalPartition() {
b.localTopicManager.RemoveLocalPartition(t, partition)
}
@@ -60,7 +60,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// connect to the follower
var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
- glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
+ log.V(3).Infof("follower broker: %v", req.GetInit().FollowerBroker)
if req.GetInit().FollowerBroker != "" {
follower := req.GetInit().FollowerBroker
if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
@@ -90,7 +90,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
}
- glog.V(0).Infof("follower %s connected", follower)
+ log.V(3).Infof("follower %s connected", follower)
}
go func() {
@@ -107,7 +107,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}})
break
}
- glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
+ log.V(3).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
break
}
if ack.GetAck().Key == nil {
@@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
},
},
}); err != nil {
- glog.Errorf("Error sending ack to follower: %v", err)
+ log.Errorf("Error sending ack to follower: %v", err)
break
}
lastOffset = currentLastOffset
@@ -133,9 +133,9 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
if lastOffset > 0 {
- glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
+ log.V(3).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
- glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
+ log.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
}
}
if subscribeFollowMeStream != nil {
@@ -145,7 +145,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
},
}); err != nil {
if err != io.EOF {
- glog.Errorf("Error sending close to follower: %v", err)
+ log.Errorf("Error sending close to follower: %v", err)
}
}
}
@@ -169,7 +169,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Client disconnected
return false
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err)
return false
default:
// Continue processing the request
@@ -190,7 +190,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// Client disconnected
return false, nil
}
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ log.V(3).Infof("Subscriber %s disconnected: %v", clientName, err)
return false, nil
default:
// Continue processing the request
@@ -207,7 +207,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
TsNs: logEntry.TsNs,
},
}}); err != nil {
- glog.Errorf("Error sending data: %v", err)
+ log.Errorf("Error sending data: %v", err)
return false, err
}
@@ -241,7 +241,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
// try to resume
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
- glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
+ log.V(3).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 985b0a47e..6dffa606d 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
@@ -29,13 +29,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to add subscriber: %v", err)
}
- glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
+ log.V(3).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
}
defer func() {
b.SubCoordinator.RemoveSubscriber(initMessage)
- glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}()
ctx := stream.Context()
@@ -45,15 +45,15 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
for {
req, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}
if ackUnAssignment := req.GetAckUnAssignment(); ackUnAssignment != nil {
- glog.V(0).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment)
+ log.V(3).Infof("subscriber %s/%s/%s ack close of %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackUnAssignment)
cg.AckUnAssignment(cgi, ackUnAssignment)
}
if ackAssignment := req.GetAckAssignment(); ackAssignment != nil {
- glog.V(0).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment)
+ log.V(3).Infof("subscriber %s/%s/%s ack assignment %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, ackAssignment)
cg.AckAssignment(cgi, ackAssignment)
}
@@ -80,12 +80,12 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// Client disconnected
return err
}
- glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
return err
case message := <-cgi.ResponseChan:
- glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message)
+ log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, message)
if err := stream.Send(message); err != nil {
- glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
+ log.V(3).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}
}
}
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index bed906c30..39344d4a6 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -3,7 +3,7 @@ package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -34,7 +34,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
err = nil
break
}
- glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ log.V(3).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
break
}
@@ -43,10 +43,10 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
lastOffset = ackMessage.TsNs
// println("sub follower got offset", lastOffset)
} else if closeMessage := req.GetClose(); closeMessage != nil {
- glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
+ log.V(3).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
return nil
} else {
- glog.Errorf("unknown message: %v", req)
+ log.Errorf("unknown message: %v", req)
}
}
@@ -56,7 +56,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
}
- glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
+ log.V(3).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
return err
}
@@ -90,7 +90,7 @@ func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Part
util.Uint64toBytes(offsetBytes, uint64(offset))
return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
+ log.V(3).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
})
}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index d80fa91a4..ef1d6ec75 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -3,7 +3,7 @@ package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/filer_client"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -89,12 +89,12 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
time.Sleep(time.Millisecond * 237)
}
self := option.BrokerAddress()
- glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
+ log.V(3).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
newBrokerBalancerCh := make(chan string, 1)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
- glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner)
+ log.V(3).Infof("broker %s found balanacer %s", self, newLockOwner)
newBrokerBalancerCh <- newLockOwner
})
mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh)
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 222ff16ba..cc1cbce19 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -2,7 +2,7 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -13,12 +13,12 @@ func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partitio
// get or generate a local partition
conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
if readConfErr != nil {
- glog.Errorf("topic %v not found: %v", t, readConfErr)
+ log.Errorf("topic %v not found: %v", t, readConfErr)
return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
}
localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
if getOrGenError != nil {
- glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ log.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
}
return localTopicPartition, nil
@@ -55,7 +55,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m
// also fix assignee broker if invalid
hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments)
if hasChanges {
- glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
+ log.V(3).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil {
return err
}
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index d6513b2a2..94ae45b73 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -2,7 +2,7 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"sync/atomic"
@@ -25,7 +25,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
for {
if err := b.appendToFile(targetFile, buf); err != nil {
- glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
+ log.V(3).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
@@ -40,6 +40,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) l
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
- glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
+ log.V(3).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
}
}