aboutsummaryrefslogtreecommitdiff
path: root/other/mq_client_example
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-03-09 23:49:42 -0700
committerGitHub <noreply@github.com>2025-03-09 23:49:42 -0700
commit02773a61074d1a130419318d05d4d0b027cac4b4 (patch)
tree590918137bc7edfd23653e377249c45145ec7e54 /other/mq_client_example
parent14cb8a24c68ce3fd0d3df716295805a8c5c1b8ef (diff)
downloadseaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.tar.xz
seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.zip
Accumulated changes for message queue (#6600)
* rename * set agent address * refactor * add agent sub * pub messages * grpc new client * can publish records via agent * send init message with session id * fmt * check cancelled request while waiting * use sessionId * handle possible nil stream * subscriber process messages * separate debug port * use atomic int64 * less logs * minor * skip io.EOF * rename * remove unused * use saved offsets * do not reuse session, since always session id is new after restart remove last active ts from SessionEntry * simplify printing * purge unused * just proxy the subscription, skipping the session step * adjust offset types * subscribe offset type and possible value * start after the known tsns * avoid wrongly set startPosition * move * remove * refactor * typo * fix * fix changed path
Diffstat (limited to 'other/mq_client_example')
-rw-r--r--other/mq_client_example/agent_pub_record/agent_pub_record.go95
-rw-r--r--other/mq_client_example/agent_sub_record/agent_sub_record.go62
-rw-r--r--other/mq_client_example/example/my_record.go56
3 files changed, 213 insertions, 0 deletions
diff --git a/other/mq_client_example/agent_pub_record/agent_pub_record.go b/other/mq_client_example/agent_pub_record/agent_pub_record.go
new file mode 100644
index 000000000..48e78f530
--- /dev/null
+++ b/other/mq_client_example/agent_pub_record/agent_pub_record.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "log"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var (
+ messageCount = flag.Int("n", 1000, "message count")
+ messageDelay = flag.Duration("d", time.Second, "delay between messages")
+ concurrency = flag.Int("c", 4, "concurrent publishers")
+ partitionCount = flag.Int("p", 6, "partition count")
+
+ clientName = flag.String("client", "c1", "client name")
+
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("t", "test", "t")
+ agent = flag.String("agent", "localhost:16777", "mq agent address")
+
+ counter int32
+)
+
+func genMyRecord(id int32) *example.MyRecord {
+ return &example.MyRecord{
+ Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)),
+ Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)),
+ Field2: fmt.Sprintf("field2-%s-%d", *clientName, id),
+ Field3: id,
+ Field4: int64(id),
+ Field5: float32(id),
+ Field6: float64(id),
+ Field7: id%2 == 0,
+ }
+}
+
+func doPublish(publisher *agent_client.PublishSession, id int) {
+ startTime := time.Now()
+ for {
+ i := atomic.AddInt32(&counter, 1)
+ if i > int32(*messageCount) {
+ break
+ }
+ // Simulate publishing a message
+ myRecord := genMyRecord(int32(i))
+ if err := publisher.PublishMessageRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil {
+ fmt.Println(err)
+ break
+ }
+ if *messageDelay > 0 {
+ time.Sleep(*messageDelay)
+ fmt.Printf("sent %+v\n", string(myRecord.Key))
+ }
+ }
+ elapsed := time.Since(startTime)
+ log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
+}
+
+func main() {
+ flag.Parse()
+
+ recordType := example.MyRecordType()
+
+ session, err := agent_client.NewPublishSession(*agent, schema.NewSchema(*namespace, *t, recordType), *partitionCount, *clientName)
+ if err != nil {
+ log.Printf("failed to create session: %v", err)
+ return
+ }
+ defer session.CloseSession()
+
+ startTime := time.Now()
+
+ var wg sync.WaitGroup
+ // Start multiple publishers
+ for i := 0; i < *concurrency; i++ {
+ wg.Add(1)
+ go func(id int) {
+ defer wg.Done()
+ doPublish(session, id)
+ }(i)
+ }
+
+ // Wait for all publishers to finish
+ wg.Wait()
+ elapsed := time.Since(startTime)
+
+ log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
+
+}
diff --git a/other/mq_client_example/agent_sub_record/agent_sub_record.go b/other/mq_client_example/agent_sub_record/agent_sub_record.go
new file mode 100644
index 000000000..21c74a46d
--- /dev/null
+++ b/other/mq_client_example/agent_sub_record/agent_sub_record.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/other/mq_client_example/example"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "log"
+ "time"
+)
+
+var (
+ namespace = flag.String("ns", "test", "namespace")
+ t = flag.String("topic", "test", "topic")
+ agent = flag.String("agent", "localhost:16777", "mq agent address")
+ maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
+ slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency")
+ timeAgo = flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+
+ clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
+)
+
+func main() {
+ flag.Parse()
+
+ // determine the start of the messages
+ var startTsNs int64
+ startType := schema_pb.OffsetType_RESUME_OR_EARLIEST
+ if *timeAgo > 0 {
+ startTsNs = time.Now().Add(-*timeAgo).UnixNano()
+ startType = schema_pb.OffsetType_EXACT_TS_NS
+ }
+
+ session, err := agent_client.NewSubscribeSession(*agent, &agent_client.SubscribeOption{
+ ConsumerGroup: "test",
+ ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
+ Topic: topic.NewTopic(*namespace, *t),
+ OffsetType: startType,
+ OffsetTsNs: startTsNs,
+ Filter: "",
+ MaxSubscribedPartitions: int32(*maxPartitionCount),
+ SlidingWindowSize: int32(*slidingWindowSize),
+ })
+ if err != nil {
+ log.Printf("new subscribe session: %v", err)
+ return
+ }
+ defer session.CloseSession()
+
+ counter := 0
+ session.SubscribeMessageRecord(func(key []byte, recordValue *schema_pb.RecordValue) {
+ counter++
+ record := example.FromRecordValue(recordValue)
+ fmt.Printf("%d %s %v\n", counter, string(key), record.Field2)
+ }, func() {
+ log.Printf("done received %d messages", counter)
+ })
+
+}
diff --git a/other/mq_client_example/example/my_record.go b/other/mq_client_example/example/my_record.go
new file mode 100644
index 000000000..ea6a0e7bd
--- /dev/null
+++ b/other/mq_client_example/example/my_record.go
@@ -0,0 +1,56 @@
+package example
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+type MyRecord struct {
+ Key []byte
+ Field1 []byte
+ Field2 string
+ Field3 int32
+ Field4 int64
+ Field5 float32
+ Field6 float64
+ Field7 bool
+}
+
+func MyRecordType() *schema_pb.RecordType {
+ return schema.RecordTypeBegin().
+ WithField("key", schema.TypeBytes).
+ WithField("field1", schema.TypeBytes).
+ WithField("field2", schema.TypeString).
+ WithField("field3", schema.TypeInt32).
+ WithField("field4", schema.TypeInt64).
+ WithField("field5", schema.TypeFloat).
+ WithField("field6", schema.TypeDouble).
+ WithField("field7", schema.TypeBoolean).
+ RecordTypeEnd()
+}
+
+func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue {
+ return schema.RecordBegin().
+ SetBytes("key", r.Key).
+ SetBytes("field1", r.Field1).
+ SetString("field2", r.Field2).
+ SetInt32("field3", r.Field3).
+ SetInt64("field4", r.Field4).
+ SetFloat("field5", r.Field5).
+ SetDouble("field6", r.Field6).
+ SetBool("field7", r.Field7).
+ RecordEnd()
+}
+
+func FromRecordValue(recordValue *schema_pb.RecordValue) *MyRecord {
+ return &MyRecord{
+ Key: recordValue.Fields["key"].GetBytesValue(),
+ Field1: recordValue.Fields["field1"].GetBytesValue(),
+ Field2: recordValue.Fields["field2"].GetStringValue(),
+ Field3: recordValue.Fields["field3"].GetInt32Value(),
+ Field4: recordValue.Fields["field4"].GetInt64Value(),
+ Field5: recordValue.Fields["field5"].GetFloatValue(),
+ Field6: recordValue.Fields["field6"].GetDoubleValue(),
+ Field7: recordValue.Fields["field7"].GetBoolValue(),
+ }
+}