aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/segment/message_serde_test.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-22 01:12:32 -0700
committerchrislu <chris.lu@gmail.com>2022-07-22 01:12:32 -0700
commitf72f6961092c60c0a537d8bba5e242dce1ebb7a2 (patch)
tree7031a7f79cc04f5d5f6868d9d63be17e65932aad /weed/mq/segment/message_serde_test.go
parent35261c805efe56a05ae524b798e0158719cf7a04 (diff)
downloadseaweedfs-f72f6961092c60c0a537d8bba5e242dce1ebb7a2.tar.xz
seaweedfs-f72f6961092c60c0a537d8bba5e242dce1ebb7a2.zip
add message batch
Diffstat (limited to 'weed/mq/segment/message_serde_test.go')
-rw-r--r--weed/mq/segment/message_serde_test.go41
1 files changed, 25 insertions, 16 deletions
diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go
index 7ba0febf0..bf561d2c0 100644
--- a/weed/mq/segment/message_serde_test.go
+++ b/weed/mq/segment/message_serde_test.go
@@ -10,27 +10,36 @@ import (
func TestMessageSerde(t *testing.T) {
b := flatbuffers.NewBuilder(1024)
- prop := make(map[string]string)
- prop["n1"] = "v1"
- prop["n2"] = "v2"
+ prop := make(map[string][]byte)
+ prop["n1"] = []byte("v1")
+ prop["n2"] = []byte("v2")
- CreateMessage(b, 1, 2, 3, 4, 5, 6, prop,
- []byte("the primary key"), []byte("body is here"))
+ bb := NewMessageBatchBuilder(b, 1, 2, 3, 4)
- buf := b.FinishedBytes()
+ bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here"))
+
+ bb.BuildMessageBatch()
+
+ buf := bb.GetBytes()
println("serialized size", len(buf))
- m := message_fbs.GetRootAsMessage(buf, 0)
+ mb := message_fbs.GetRootAsMessageBatch(buf, 0)
+
+ assert.Equal(t, int32(1), mb.ProducerId())
+ assert.Equal(t, int32(2), mb.ProducerEpoch())
+ assert.Equal(t, int32(3), mb.SegmentId())
+ assert.Equal(t, int32(4), mb.Flags())
+ assert.Equal(t, int64(5), mb.SegmentSeqBase())
+ assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta())
+ assert.Equal(t, int64(6), mb.TsMsBase())
+ assert.Equal(t, int32(0), mb.TsMsMaxDelta())
+
+ assert.Equal(t, 1, mb.MessagesLength())
- assert.Equal(t, int32(1), m.ProducerId())
- assert.Equal(t, int64(2), m.ProducerSeq())
- assert.Equal(t, int32(3), m.SegmentId())
- assert.Equal(t, int64(4), m.SegmentSeq())
- assert.Equal(t, int64(5), m.EventTsNs())
- assert.Equal(t, int64(6), m.RecvTsNs())
+ m := &message_fbs.Message{}
+ mb.Messages(m, 0)
- assert.Equal(t, 2, m.PropertiesLength())
nv := &message_fbs.NameValue{}
m.Properties(nv, 0)
assert.Equal(t, "n1", string(nv.Name()))
@@ -41,7 +50,7 @@ func TestMessageSerde(t *testing.T) {
assert.Equal(t, []byte("the primary key"), m.Key())
assert.Equal(t, []byte("body is here"), m.Data())
- m.MutateSegmentSeq(123)
- assert.Equal(t, int64(123), m.SegmentSeq())
+ assert.Equal(t, int32(0), m.SeqDelta())
+ assert.Equal(t, int32(0), m.TsMsDelta())
}