diff options
| author | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:30 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-09-04 21:43:30 -0700 |
| commit | 9e4f98569898985ed285d8bb8a39b4ea5f095a98 (patch) | |
| tree | 722d4bb8536334ec3ed97810760698faab719999 /weed/mq/client/cmd | |
| parent | cb470d44df2fed94ad8fd370b1c281cb126d373b (diff) | |
| download | seaweedfs-9e4f98569898985ed285d8bb8a39b4ea5f095a98.tar.xz seaweedfs-9e4f98569898985ed285d8bb8a39b4ea5f095a98.zip | |
publish, benchmark
Diffstat (limited to 'weed/mq/client/cmd')
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 48 |
1 files changed, 38 insertions, 10 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index a540143a4..f5a454640 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -1,12 +1,34 @@ package main import ( + "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "log" + "sync" + "time" ) -func main() { +var ( + messageCount = flag.Int("n", 1000, "message count") + concurrency = flag.Int("c", 4, "concurrency count") +) + +func doPublish(publisher *pub_client.TopicPublisher, id int) { + startTime := time.Now() + for i := 0; i < *messageCount / *concurrency; i++ { + // Simulate publishing a message + key := []byte(fmt.Sprintf("key-%d-%d", id, i)) + value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + publisher.Publish(key, value) // Call your publisher function here + // println("Published", string(key), string(value)) + } + elapsed := time.Since(startTime) + log.Printf("Publisher %d finished in %s", id, elapsed) +} +func main() { + flag.Parse() publisher := pub_client.NewTopicPublisher( "test", "test") if err := publisher.Connect("localhost:17777"); err != nil { @@ -14,16 +36,22 @@ func main() { return } - for i := 0; i < 10; i++ { - if dataErr := publisher.Publish( - []byte(fmt.Sprintf("key-%d", i)), - []byte(fmt.Sprintf("value-%d", i)), - ); dataErr != nil { - fmt.Println(dataErr) - return - } + startTime := time.Now() + + // Start multiple publishers + var wg sync.WaitGroup + for i := 0; i < *concurrency; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + doPublish(publisher, id) + }(i) } - fmt.Println("done publishing") + // Wait for all publishers to finish + wg.Wait() + elapsed := time.Since(startTime) + + log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) } |
