aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-30 09:49:08 -0700
committerchrislu <chris.lu@gmail.com>2024-05-30 09:49:08 -0700
commitd1f0c404023a6ad5b4b9b80b85ef41948fc0c48e (patch)
tree619f0b2f3dff1ab53342e93ffacb8949a1c89314
parentdf9d889489af7659b14a54a0642f1b625c20f8e4 (diff)
downloadseaweedfs-d1f0c404023a6ad5b4b9b80b85ef41948fc0c48e.tar.xz
seaweedfs-d1f0c404023a6ad5b4b9b80b85ef41948fc0c48e.zip
remove per-message debug logs
-rw-r--r--weed/mq/broker/broker_grpc_sub.go4
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go2
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go5
3 files changed, 5 insertions, 6 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index d1131892b..286812a9b 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -112,7 +112,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
currentLastOffset := imt.GetOldestAckedTimestamp()
- fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
+ // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
@@ -125,7 +125,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
break
}
lastOffset = currentLastOffset
- fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
+ // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
}
}
if lastOffset > 0 {
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index cfea6f7c7..f7f4ac7e9 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -42,7 +42,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
// Process the received message
if ackMessage := req.GetAck(); ackMessage != nil {
lastOffset = ackMessage.TsNs
- println("sub follower got offset", lastOffset)
+ // println("sub follower got offset", lastOffset)
} else if closeMessage := req.GetClose(); closeMessage != nil {
glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
return nil
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index f8effef95..f1c46e06b 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -1,7 +1,6 @@
package sub_coordinator
import (
- "fmt"
"sort"
"sync"
)
@@ -22,7 +21,7 @@ func NewInflightMessageTracker(capacity int) *InflightMessageTracker {
// EnflightMessage tracks the message with the key and timestamp.
// These messages are sent to the consumer group instances and waiting for ack.
func (imt *InflightMessageTracker) EnflightMessage(key []byte, tsNs int64) {
- fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs)
+ // fmt.Printf("EnflightMessage(%s,%d)\n", string(key), tsNs)
imt.mu.Lock()
defer imt.mu.Unlock()
imt.messages[string(key)] = tsNs
@@ -53,7 +52,7 @@ func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64)
// AcknowledgeMessage acknowledges the message with the key and timestamp.
func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
- fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs)
+ // fmt.Printf("AcknowledgeMessage(%s,%d)\n", string(key), tsNs)
imt.mu.Lock()
defer imt.mu.Unlock()
timestamp, exists := imt.messages[string(key)]