diff options
Diffstat (limited to 'other/mq_client_example')
3 files changed, 213 insertions, 0 deletions
diff --git a/other/mq_client_example/agent_pub_record/agent_pub_record.go b/other/mq_client_example/agent_pub_record/agent_pub_record.go new file mode 100644 index 000000000..48e78f530 --- /dev/null +++ b/other/mq_client_example/agent_pub_record/agent_pub_record.go @@ -0,0 +1,95 @@ +package main + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/other/mq_client_example/example" + "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "log" + "sync" + "sync/atomic" + "time" +) + +var ( + messageCount = flag.Int("n", 1000, "message count") + messageDelay = flag.Duration("d", time.Second, "delay between messages") + concurrency = flag.Int("c", 4, "concurrent publishers") + partitionCount = flag.Int("p", 6, "partition count") + + clientName = flag.String("client", "c1", "client name") + + namespace = flag.String("ns", "test", "namespace") + t = flag.String("t", "test", "t") + agent = flag.String("agent", "localhost:16777", "mq agent address") + + counter int32 +) + +func genMyRecord(id int32) *example.MyRecord { + return &example.MyRecord{ + Key: []byte(fmt.Sprintf("key-%s-%d", *clientName, id)), + Field1: []byte(fmt.Sprintf("field1-%s-%d", *clientName, id)), + Field2: fmt.Sprintf("field2-%s-%d", *clientName, id), + Field3: id, + Field4: int64(id), + Field5: float32(id), + Field6: float64(id), + Field7: id%2 == 0, + } +} + +func doPublish(publisher *agent_client.PublishSession, id int) { + startTime := time.Now() + for { + i := atomic.AddInt32(&counter, 1) + if i > int32(*messageCount) { + break + } + // Simulate publishing a message + myRecord := genMyRecord(int32(i)) + if err := publisher.PublishMessageRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil { + fmt.Println(err) + break + } + if *messageDelay > 0 { + time.Sleep(*messageDelay) + fmt.Printf("sent %+v\n", string(myRecord.Key)) + } + } + elapsed := time.Since(startTime) + log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed) +} + +func main() { + flag.Parse() + + recordType := example.MyRecordType() + + session, err := agent_client.NewPublishSession(*agent, schema.NewSchema(*namespace, *t, recordType), *partitionCount, *clientName) + if err != nil { + log.Printf("failed to create session: %v", err) + return + } + defer session.CloseSession() + + startTime := time.Now() + + var wg sync.WaitGroup + // Start multiple publishers + for i := 0; i < *concurrency; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + doPublish(session, id) + }(i) + } + + // Wait for all publishers to finish + wg.Wait() + elapsed := time.Since(startTime) + + log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) + +} diff --git a/other/mq_client_example/agent_sub_record/agent_sub_record.go b/other/mq_client_example/agent_sub_record/agent_sub_record.go new file mode 100644 index 000000000..21c74a46d --- /dev/null +++ b/other/mq_client_example/agent_sub_record/agent_sub_record.go @@ -0,0 +1,62 @@ +package main + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/other/mq_client_example/example" + "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "log" + "time" +) + +var ( + namespace = flag.String("ns", "test", "namespace") + t = flag.String("topic", "test", "topic") + agent = flag.String("agent", "localhost:16777", "mq agent address") + maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count") + slidingWindowSize = flag.Int("slidingWindowSize", 1, "per partition concurrency") + timeAgo = flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + + clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id") +) + +func main() { + flag.Parse() + + // determine the start of the messages + var startTsNs int64 + startType := schema_pb.OffsetType_RESUME_OR_EARLIEST + if *timeAgo > 0 { + startTsNs = time.Now().Add(-*timeAgo).UnixNano() + startType = schema_pb.OffsetType_EXACT_TS_NS + } + + session, err := agent_client.NewSubscribeSession(*agent, &agent_client.SubscribeOption{ + ConsumerGroup: "test", + ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId), + Topic: topic.NewTopic(*namespace, *t), + OffsetType: startType, + OffsetTsNs: startTsNs, + Filter: "", + MaxSubscribedPartitions: int32(*maxPartitionCount), + SlidingWindowSize: int32(*slidingWindowSize), + }) + if err != nil { + log.Printf("new subscribe session: %v", err) + return + } + defer session.CloseSession() + + counter := 0 + session.SubscribeMessageRecord(func(key []byte, recordValue *schema_pb.RecordValue) { + counter++ + record := example.FromRecordValue(recordValue) + fmt.Printf("%d %s %v\n", counter, string(key), record.Field2) + }, func() { + log.Printf("done received %d messages", counter) + }) + +} diff --git a/other/mq_client_example/example/my_record.go b/other/mq_client_example/example/my_record.go new file mode 100644 index 000000000..ea6a0e7bd --- /dev/null +++ b/other/mq_client_example/example/my_record.go @@ -0,0 +1,56 @@ +package example + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +type MyRecord struct { + Key []byte + Field1 []byte + Field2 string + Field3 int32 + Field4 int64 + Field5 float32 + Field6 float64 + Field7 bool +} + +func MyRecordType() *schema_pb.RecordType { + return schema.RecordTypeBegin(). + WithField("key", schema.TypeBytes). + WithField("field1", schema.TypeBytes). + WithField("field2", schema.TypeString). + WithField("field3", schema.TypeInt32). + WithField("field4", schema.TypeInt64). + WithField("field5", schema.TypeFloat). + WithField("field6", schema.TypeDouble). + WithField("field7", schema.TypeBoolean). + RecordTypeEnd() +} + +func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { + return schema.RecordBegin(). + SetBytes("key", r.Key). + SetBytes("field1", r.Field1). + SetString("field2", r.Field2). + SetInt32("field3", r.Field3). + SetInt64("field4", r.Field4). + SetFloat("field5", r.Field5). + SetDouble("field6", r.Field6). + SetBool("field7", r.Field7). + RecordEnd() +} + +func FromRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { + return &MyRecord{ + Key: recordValue.Fields["key"].GetBytesValue(), + Field1: recordValue.Fields["field1"].GetBytesValue(), + Field2: recordValue.Fields["field2"].GetStringValue(), + Field3: recordValue.Fields["field3"].GetInt32Value(), + Field4: recordValue.Fields["field4"].GetInt64Value(), + Field5: recordValue.Fields["field5"].GetFloatValue(), + Field6: recordValue.Fields["field6"].GetDoubleValue(), + Field7: recordValue.Fields["field7"].GetBoolValue(), + } +} |
