aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/segment/message_serde_test.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-22 01:12:32 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:24:38 -0700
commit06cd491abc82a6b799c47ae600a4bb81d24091ce (patch)
treece2fe18c31c54513413535c1c491cea1eca16afd /weed/mq/segment/message_serde_test.go
parent7576c244c47113b452681967b741ece5225767e3 (diff)
downloadseaweedfs-06cd491abc82a6b799c47ae600a4bb81d24091ce.tar.xz
seaweedfs-06cd491abc82a6b799c47ae600a4bb81d24091ce.zip
add message batch
Diffstat (limited to 'weed/mq/segment/message_serde_test.go')
-rw-r--r--weed/mq/segment/message_serde_test.go41
1 files changed, 25 insertions, 16 deletions
diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go
index 7ba0febf0..bf561d2c0 100644
--- a/weed/mq/segment/message_serde_test.go
+++ b/weed/mq/segment/message_serde_test.go
@@ -10,27 +10,36 @@ import (
func TestMessageSerde(t *testing.T) {
b := flatbuffers.NewBuilder(1024)
- prop := make(map[string]string)
- prop["n1"] = "v1"
- prop["n2"] = "v2"
+ prop := make(map[string][]byte)
+ prop["n1"] = []byte("v1")
+ prop["n2"] = []byte("v2")
- CreateMessage(b, 1, 2, 3, 4, 5, 6, prop,
- []byte("the primary key"), []byte("body is here"))
+ bb := NewMessageBatchBuilder(b, 1, 2, 3, 4)
- buf := b.FinishedBytes()
+ bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here"))
+
+ bb.BuildMessageBatch()
+
+ buf := bb.GetBytes()
println("serialized size", len(buf))
- m := message_fbs.GetRootAsMessage(buf, 0)
+ mb := message_fbs.GetRootAsMessageBatch(buf, 0)
+
+ assert.Equal(t, int32(1), mb.ProducerId())
+ assert.Equal(t, int32(2), mb.ProducerEpoch())
+ assert.Equal(t, int32(3), mb.SegmentId())
+ assert.Equal(t, int32(4), mb.Flags())
+ assert.Equal(t, int64(5), mb.SegmentSeqBase())
+ assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta())
+ assert.Equal(t, int64(6), mb.TsMsBase())
+ assert.Equal(t, int32(0), mb.TsMsMaxDelta())
+
+ assert.Equal(t, 1, mb.MessagesLength())
- assert.Equal(t, int32(1), m.ProducerId())
- assert.Equal(t, int64(2), m.ProducerSeq())
- assert.Equal(t, int32(3), m.SegmentId())
- assert.Equal(t, int64(4), m.SegmentSeq())
- assert.Equal(t, int64(5), m.EventTsNs())
- assert.Equal(t, int64(6), m.RecvTsNs())
+ m := &message_fbs.Message{}
+ mb.Messages(m, 0)
- assert.Equal(t, 2, m.PropertiesLength())
nv := &message_fbs.NameValue{}
m.Properties(nv, 0)
assert.Equal(t, "n1", string(nv.Name()))
@@ -41,7 +50,7 @@ func TestMessageSerde(t *testing.T) {
assert.Equal(t, []byte("the primary key"), m.Key())
assert.Equal(t, []byte("body is here"), m.Data())
- m.MutateSegmentSeq(123)
- assert.Equal(t, int64(123), m.SegmentSeq())
+ assert.Equal(t, int32(0), m.SeqDelta())
+ assert.Equal(t, int32(0), m.TsMsDelta())
}