aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-02 15:31:31 -0700
committerchrislu <chris.lu@gmail.com>2024-04-02 15:31:31 -0700
commitf37c0d0d7a59d433a48a0102d7d76471ab034f40 (patch)
tree062a9968dd323c1dd6a26f49fe6f8f0a8fa8c2c6
parentd935f70e3cf8a03e5d1b118ca70fd66470567435 (diff)
downloadseaweedfs-f37c0d0d7a59d433a48a0102d7d76471ab034f40.tar.xz
seaweedfs-f37c0d0d7a59d433a48a0102d7d76471ab034f40.zip
comment out println
-rw-r--r--weed/mq/broker/broker_grpc_pub.go4
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go7
-rw-r--r--weed/mq/topic/local_partition.go8
3 files changed, 8 insertions, 11 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index fb0e3a11f..a217489de 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -79,7 +79,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
go func() {
defer func() {
- println("stop sending ack to publisher", initMessage.PublisherName)
+ // println("stop sending ack to publisher", initMessage.PublisherName)
}()
lastAckTime := time.Now()
@@ -93,7 +93,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
if err := stream.Send(response); err != nil {
glog.Errorf("Error sending response %v: %v", response, err)
}
- println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
+ // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
lastAckTime = time.Now()
} else {
time.Sleep(1 * time.Second)
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 358b310bf..57cbbd2d2 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -58,10 +58,9 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
if err := stream.Send(&mq_pb.PublishFollowMeResponse{
AckTsNs: dataMessage.TsNs,
}); err != nil {
- // TODO save un-acked messages to disk
glog.Errorf("Error sending response %v: %v", dataMessage, err)
}
- println("ack", string(dataMessage.Key), dataMessage.TsNs)
+ // println("ack", string(dataMessage.Key), dataMessage.TsNs)
} else if closeMessage := req.GetClose(); closeMessage != nil {
glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
break
@@ -74,7 +73,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
if mem.stopTime.UnixNano() <= flushMessage.TsNs {
inMemoryBuffers.Dequeue()
- println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
+ // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
} else {
break
}
@@ -117,8 +116,6 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
- // TODO append block with more metadata
-
for {
if err := b.appendToFile(targetFile, mem.buf); err != nil {
glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 6e429e5df..157fa2792 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -56,7 +56,7 @@ func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
// maybe send to the follower
if p.followerStream != nil {
- println("recv", string(message.Key), message.TsNs)
+ // println("recv", string(message.Key), message.TsNs)
if followErr := p.followerStream.Send(&mq_pb.PublishFollowMeRequest{
Message: &mq_pb.PublishFollowMeRequest_Data{
Data: message,
@@ -166,7 +166,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
// start receiving ack from follower
go func() {
defer func() {
- println("stop receiving ack from follower")
+ // println("stop receiving ack from follower")
}()
for {
@@ -181,7 +181,7 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
return
}
atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
- println("recv ack", ack.AckTsNs)
+ // println("recv ack", ack.AckTsNs)
}
}()
return nil
@@ -234,6 +234,6 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
}); followErr != nil {
glog.Errorf("send follower %s flush message: %v", p.follower, followErr)
}
- println("notifying", p.follower, "flushed at", flushTsNs)
+ // println("notifying", p.follower, "flushed at", flushTsNs)
}
}