diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-31 00:19:16 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-31 00:19:16 -0700 |
| commit | c9df613b6bfb7e22e4e596151300aefc56ac909f (patch) | |
| tree | 52d6da7e9fd898eed8bbea513603673e363bacb9 /weed/mq | |
| parent | ca4f89a6f60873849cecc760e9b9bdd5ae33d5f4 (diff) | |
| download | seaweedfs-c9df613b6bfb7e22e4e596151300aefc56ac909f.tar.xz seaweedfs-c9df613b6bfb7e22e4e596151300aefc56ac909f.zip | |
add publisher name for debugging
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 8 | ||||
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 10 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publisher.go | 1 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 5 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 9 |
5 files changed, 23 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 5019f4f72..60cc1c727 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -79,7 +79,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } go func() { defer func() { - println("stop sending ack to publisher") + println("stop sending ack to publisher", initMessage.PublisherName) }() lastAckTime := time.Now() @@ -93,7 +93,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err := stream.Send(response); err != nil { glog.Errorf("Error sending response %v: %v", response, err) } - println("sent ack", acknowledgedSequence) + println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName) lastAckTime = time.Now() } else { time.Sleep(1 * time.Second) @@ -130,7 +130,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis if err == io.EOF { break } - glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err) + glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err) break } @@ -146,7 +146,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis } } - glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition) + glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName) return nil } diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 2873ba21f..0950af187 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -16,6 +16,8 @@ var ( concurrency = flag.Int("c", 4, "concurrent publishers") partitionCount = flag.Int("p", 6, "partition count") + clientName = flag.String("client", "c1", "client name") + namespace = flag.String("ns", "test", "namespace") t = flag.String("t", "test", "t") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") @@ -25,16 +27,17 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { startTime := time.Now() for i := 0; i < *messageCount / *concurrency; i++ { // Simulate publishing a message - key := []byte(fmt.Sprintf("key-%d-%d", id, i)) - value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i)) + value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i)) if err := publisher.Publish(key, value); err != nil { fmt.Println(err) break } + time.Sleep(time.Second) // println("Published", string(key), string(value)) } elapsed := time.Since(startTime) - log.Printf("Publisher %d finished in %s", id, elapsed) + log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) } func main() { @@ -44,6 +47,7 @@ func main() { CreateTopic: true, CreateTopicPartitionCount: int32(*partitionCount), Brokers: strings.Split(*seedBrokers, ","), + PublisherName: *clientName, } publisher := pub_client.NewTopicPublisher(config) diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 9262d6e0c..09984bae3 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -17,6 +17,7 @@ type PublisherConfiguration struct { CreateTopic bool CreateTopicPartitionCount int32 Brokers []string + PublisherName string // for debugging } type PublishClient struct { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e91127522..7b4d25869 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -146,6 +146,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro Partition: job.Partition, AckInterval: 128, FollowerBrokers: job.FollowerBrokers, + PublisherName: p.config.PublisherName, }, }, }); err != nil { @@ -184,9 +185,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return } if ackResp.AckSequence > 0 { - log.Printf("ack %d", ackResp.AckSequence) + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } - if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { return } } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index bff0ec17a..0873d6bd7 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -8,6 +8,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "sync" "sync/atomic" "time" @@ -171,7 +173,12 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa for { ack, err := p.followerStream.Recv() if err != nil { - glog.Errorf("Error receiving follower ack: %v", err) + e, _ := status.FromError(err) + if e.Code() == codes.Canceled { + glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.follower) + return + } + glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.follower, err) return } atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) |
