aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-02 08:48:51 -0700
committerchrislu <chris.lu@gmail.com>2024-05-02 08:48:51 -0700
commit0f35b3a4eab7785501b47239f7bf5866df672f6c (patch)
treeaeabdadb711f353cdccc96f2072db2e7fdbdb144
parent928a4e8dff01bab01766ca761f3bc56862a20736 (diff)
downloadseaweedfs-0f35b3a4eab7785501b47239f7bf5866df672f6c.tar.xz
seaweedfs-0f35b3a4eab7785501b47239f7bf5866df672f6c.zip
add example to publish a record
-rw-r--r--weed/mq/client/cmd/weed_pub_record/publisher_record.go126
-rw-r--r--weed/mq/client/pub_client/publish.go9
2 files changed, 134 insertions, 1 deletions
diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
new file mode 100644
index 000000000..8e8e6b21c
--- /dev/null
+++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
@@ -0,0 +1,126 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "log"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ messageCount = flag.Int("n", 1000, "message count")
+ concurrency = flag.Int("c", 4, "concurrent publishers")
+ partitionCount = flag.Int("p", 6, "partition count")
+
+ clientName = flag.String("client", "c1", "client name")
+
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("t", "test", "t")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+)
+
+func doPublish(publisher *pub_client.TopicPublisher, id int) {
+ startTime := time.Now()
+ for i := 0; i < *messageCount / *concurrency; i++ {
+ // Simulate publishing a message
+ myRecord := genMyRecord(i)
+ if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil {
+ fmt.Println(err)
+ break
+ }
+ time.Sleep(time.Second)
+ // println("Published", string(key), string(value))
+ }
+ if err := publisher.FinishPublish(); err != nil {
+ fmt.Println(err)
+ }
+ elapsed := time.Since(startTime)
+ log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
+}
+
+type MyRecord struct {
+ Key []byte
+ Field1 []byte
+ Field2 string
+ Field3 int
+ Field4 int64
+ Field5 float32
+ Field6 float64
+ Field7 bool
+}
+
+func genMyRecord(id int) *MyRecord {
+ return &MyRecord{
+ Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)),
+ Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)),
+ Field2: fmt.Sprintf("field2-%s-%d", *clientName, id),
+ Field3: id,
+ Field4: int64(id),
+ Field5: float32(id),
+ Field6: float64(id),
+ Field7: id%2 == 0,
+ }
+}
+
+func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
+ return schema.RecordBegin().
+ SetBytes("key", r.Key).
+ SetBytes("field1", r.Field1).
+ SetString("field2", r.Field2).
+ SetInt("field3", int32(r.Field3)).
+ SetLong("field4", r.Field4).
+ SetFloat("field5", r.Field5).
+ SetDouble("field6", r.Field6).
+ SetBool("field7", r.Field7).
+ RecordEnd()
+}
+
+func main() {
+ flag.Parse()
+
+ recordType := schema.RecordTypeBegin().
+ WithField("key", schema.TypeBytes).
+ WithField("field1", schema.TypeBytes).
+ WithField("field2", schema.TypeString).
+ WithField("field3", schema.TypeInteger).
+ WithField("field4", schema.TypeLong).
+ WithField("field5", schema.TypeFloat).
+ WithField("field6", schema.TypeDouble).
+ WithField("field7", schema.TypeBoolean).
+ RecordTypeEnd()
+
+ config := &pub_client.PublisherConfiguration{
+ Topic: topic.NewTopic(*namespace, *t),
+ PartitionCount: int32(*partitionCount),
+ Brokers: strings.Split(*seedBrokers, ","),
+ PublisherName: *clientName,
+ RecordType: recordType,
+ }
+ publisher := pub_client.NewTopicPublisher(config)
+
+ startTime := time.Now()
+
+ var wg sync.WaitGroup
+ // Start multiple publishers
+ for i := 0; i < *concurrency; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ doPublish(publisher, id)
+ }(i)
+ }
+
+ // Wait for all publishers to finish
+ wg.Wait()
+ elapsed := time.Since(startTime)
+ publisher.Shutdown()
+
+ log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
+
+}
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 92a1a1599..0c162d6a0 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -11,6 +11,13 @@ import (
)
func (p *TopicPublisher) Publish(key, value []byte) error {
+ if p.config.RecordType != nil {
+ return fmt.Errorf("record type is set, use PublishRecord instead")
+ }
+ return p.doPublish(key, value)
+}
+
+func (p *TopicPublisher) doPublish(key, value []byte) error {
hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
@@ -34,7 +41,7 @@ func (p *TopicPublisher) PublishRecord(key []byte, recordValue *schema_pb.Record
return fmt.Errorf("failed to marshal record value: %v", err)
}
- return p.Publish(key, value)
+ return p.doPublish(key, value)
}
func (p *TopicPublisher) FinishPublish() error {