diff options
Diffstat (limited to 'weed/mq/client')
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 10 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/publisher.go | 1 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/scheduler.go | 5 |
3 files changed, 11 insertions, 5 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index 2873ba21f..0950af187 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -16,6 +16,8 @@ var ( concurrency = flag.Int("c", 4, "concurrent publishers") partitionCount = flag.Int("p", 6, "partition count") + clientName = flag.String("client", "c1", "client name") + namespace = flag.String("ns", "test", "namespace") t = flag.String("t", "test", "t") seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers") @@ -25,16 +27,17 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) { startTime := time.Now() for i := 0; i < *messageCount / *concurrency; i++ { // Simulate publishing a message - key := []byte(fmt.Sprintf("key-%d-%d", id, i)) - value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i)) + value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i)) if err := publisher.Publish(key, value); err != nil { fmt.Println(err) break } + time.Sleep(time.Second) // println("Published", string(key), string(value)) } elapsed := time.Since(startTime) - log.Printf("Publisher %d finished in %s", id, elapsed) + log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) } func main() { @@ -44,6 +47,7 @@ func main() { CreateTopic: true, CreateTopicPartitionCount: int32(*partitionCount), Brokers: strings.Split(*seedBrokers, ","), + PublisherName: *clientName, } publisher := pub_client.NewTopicPublisher(config) diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go index 9262d6e0c..09984bae3 100644 --- a/weed/mq/client/pub_client/publisher.go +++ b/weed/mq/client/pub_client/publisher.go @@ -17,6 +17,7 @@ type PublisherConfiguration struct { CreateTopic bool CreateTopicPartitionCount int32 Brokers []string + PublisherName string // for debugging } type PublishClient struct { diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go index e91127522..7b4d25869 100644 --- a/weed/mq/client/pub_client/scheduler.go +++ b/weed/mq/client/pub_client/scheduler.go @@ -146,6 +146,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro Partition: job.Partition, AckInterval: 128, FollowerBrokers: job.FollowerBrokers, + PublisherName: p.config.PublisherName, }, }, }); err != nil { @@ -184,9 +185,9 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro return } if ackResp.AckSequence > 0 { - log.Printf("ack %d", ackResp.AckSequence) + log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } - if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { + if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { return } } |
