diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-03-09 23:49:42 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-09 23:49:42 -0700 |
| commit | 02773a61074d1a130419318d05d4d0b027cac4b4 (patch) | |
| tree | 590918137bc7edfd23653e377249c45145ec7e54 /other/mq_client_example/example | |
| parent | 14cb8a24c68ce3fd0d3df716295805a8c5c1b8ef (diff) | |
| download | seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.tar.xz seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.zip | |
Accumulated changes for message queue (#6600)
* rename
* set agent address
* refactor
* add agent sub
* pub messages
* grpc new client
* can publish records via agent
* send init message with session id
* fmt
* check cancelled request while waiting
* use sessionId
* handle possible nil stream
* subscriber process messages
* separate debug port
* use atomic int64
* less logs
* minor
* skip io.EOF
* rename
* remove unused
* use saved offsets
* do not reuse session, since always session id is new after restart
remove last active ts from SessionEntry
* simplify printing
* purge unused
* just proxy the subscription, skipping the session step
* adjust offset types
* subscribe offset type and possible value
* start after the known tsns
* avoid wrongly set startPosition
* move
* remove
* refactor
* typo
* fix
* fix changed path
Diffstat (limited to 'other/mq_client_example/example')
| -rw-r--r-- | other/mq_client_example/example/my_record.go | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/other/mq_client_example/example/my_record.go b/other/mq_client_example/example/my_record.go new file mode 100644 index 000000000..ea6a0e7bd --- /dev/null +++ b/other/mq_client_example/example/my_record.go @@ -0,0 +1,56 @@ +package example + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +type MyRecord struct { + Key []byte + Field1 []byte + Field2 string + Field3 int32 + Field4 int64 + Field5 float32 + Field6 float64 + Field7 bool +} + +func MyRecordType() *schema_pb.RecordType { + return schema.RecordTypeBegin(). + WithField("key", schema.TypeBytes). + WithField("field1", schema.TypeBytes). + WithField("field2", schema.TypeString). + WithField("field3", schema.TypeInt32). + WithField("field4", schema.TypeInt64). + WithField("field5", schema.TypeFloat). + WithField("field6", schema.TypeDouble). + WithField("field7", schema.TypeBoolean). + RecordTypeEnd() +} + +func (r *MyRecord) ToRecordValue() *schema_pb.RecordValue { + return schema.RecordBegin(). + SetBytes("key", r.Key). + SetBytes("field1", r.Field1). + SetString("field2", r.Field2). + SetInt32("field3", r.Field3). + SetInt64("field4", r.Field4). + SetFloat("field5", r.Field5). + SetDouble("field6", r.Field6). + SetBool("field7", r.Field7). + RecordEnd() +} + +func FromRecordValue(recordValue *schema_pb.RecordValue) *MyRecord { + return &MyRecord{ + Key: recordValue.Fields["key"].GetBytesValue(), + Field1: recordValue.Fields["field1"].GetBytesValue(), + Field2: recordValue.Fields["field2"].GetStringValue(), + Field3: recordValue.Fields["field3"].GetInt32Value(), + Field4: recordValue.Fields["field4"].GetInt64Value(), + Field5: recordValue.Fields["field5"].GetFloatValue(), + Field6: recordValue.Fields["field6"].GetDoubleValue(), + Field7: recordValue.Fields["field7"].GetBoolValue(), + } +} |
