aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/cmd
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-04 21:43:30 -0700
committerchrislu <chris.lu@gmail.com>2023-09-04 21:43:30 -0700
commit9e4f98569898985ed285d8bb8a39b4ea5f095a98 (patch)
tree722d4bb8536334ec3ed97810760698faab719999 /weed/mq/client/cmd
parentcb470d44df2fed94ad8fd370b1c281cb126d373b (diff)
downloadseaweedfs-9e4f98569898985ed285d8bb8a39b4ea5f095a98.tar.xz
seaweedfs-9e4f98569898985ed285d8bb8a39b4ea5f095a98.zip
publish, benchmark
Diffstat (limited to 'weed/mq/client/cmd')
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go48
1 files changed, 38 insertions, 10 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index a540143a4..f5a454640 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -1,12 +1,34 @@
package main
import (
+ "flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "log"
+ "sync"
+ "time"
)
-func main() {
+var (
+ messageCount = flag.Int("n", 1000, "message count")
+ concurrency = flag.Int("c", 4, "concurrency count")
+)
+
+func doPublish(publisher *pub_client.TopicPublisher, id int) {
+ startTime := time.Now()
+ for i := 0; i < *messageCount / *concurrency; i++ {
+ // Simulate publishing a message
+ key := []byte(fmt.Sprintf("key-%d-%d", id, i))
+ value := []byte(fmt.Sprintf("value-%d-%d", id, i))
+ publisher.Publish(key, value) // Call your publisher function here
+ // println("Published", string(key), string(value))
+ }
+ elapsed := time.Since(startTime)
+ log.Printf("Publisher %d finished in %s", id, elapsed)
+}
+func main() {
+ flag.Parse()
publisher := pub_client.NewTopicPublisher(
"test", "test")
if err := publisher.Connect("localhost:17777"); err != nil {
@@ -14,16 +36,22 @@ func main() {
return
}
- for i := 0; i < 10; i++ {
- if dataErr := publisher.Publish(
- []byte(fmt.Sprintf("key-%d", i)),
- []byte(fmt.Sprintf("value-%d", i)),
- ); dataErr != nil {
- fmt.Println(dataErr)
- return
- }
+ startTime := time.Now()
+
+ // Start multiple publishers
+ var wg sync.WaitGroup
+ for i := 0; i < *concurrency; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ doPublish(publisher, id)
+ }(i)
}
- fmt.Println("done publishing")
+ // Wait for all publishers to finish
+ wg.Wait()
+ elapsed := time.Since(startTime)
+
+ log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
}