From f72f6961092c60c0a537d8bba5e242dce1ebb7a2 Mon Sep 17 00:00:00 2001 From: chrislu Date: Fri, 22 Jul 2022 01:12:32 -0700 Subject: add message batch --- weed/mq/segment/message_serde_test.go | 41 +++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 16 deletions(-) (limited to 'weed/mq/segment/message_serde_test.go') diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go index 7ba0febf0..bf561d2c0 100644 --- a/weed/mq/segment/message_serde_test.go +++ b/weed/mq/segment/message_serde_test.go @@ -10,27 +10,36 @@ import ( func TestMessageSerde(t *testing.T) { b := flatbuffers.NewBuilder(1024) - prop := make(map[string]string) - prop["n1"] = "v1" - prop["n2"] = "v2" + prop := make(map[string][]byte) + prop["n1"] = []byte("v1") + prop["n2"] = []byte("v2") - CreateMessage(b, 1, 2, 3, 4, 5, 6, prop, - []byte("the primary key"), []byte("body is here")) + bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) - buf := b.FinishedBytes() + bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) + + bb.BuildMessageBatch() + + buf := bb.GetBytes() println("serialized size", len(buf)) - m := message_fbs.GetRootAsMessage(buf, 0) + mb := message_fbs.GetRootAsMessageBatch(buf, 0) + + assert.Equal(t, int32(1), mb.ProducerId()) + assert.Equal(t, int32(2), mb.ProducerEpoch()) + assert.Equal(t, int32(3), mb.SegmentId()) + assert.Equal(t, int32(4), mb.Flags()) + assert.Equal(t, int64(5), mb.SegmentSeqBase()) + assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta()) + assert.Equal(t, int64(6), mb.TsMsBase()) + assert.Equal(t, int32(0), mb.TsMsMaxDelta()) + + assert.Equal(t, 1, mb.MessagesLength()) - assert.Equal(t, int32(1), m.ProducerId()) - assert.Equal(t, int64(2), m.ProducerSeq()) - assert.Equal(t, int32(3), m.SegmentId()) - assert.Equal(t, int64(4), m.SegmentSeq()) - assert.Equal(t, int64(5), m.EventTsNs()) - assert.Equal(t, int64(6), m.RecvTsNs()) + m := &message_fbs.Message{} + mb.Messages(m, 0) - assert.Equal(t, 2, m.PropertiesLength()) nv := &message_fbs.NameValue{} m.Properties(nv, 0) assert.Equal(t, "n1", string(nv.Name())) @@ -41,7 +50,7 @@ func TestMessageSerde(t *testing.T) { assert.Equal(t, []byte("the primary key"), m.Key()) assert.Equal(t, []byte("body is here"), m.Data()) - m.MutateSegmentSeq(123) - assert.Equal(t, int64(123), m.SegmentSeq()) + assert.Equal(t, int32(0), m.SeqDelta()) + assert.Equal(t, int32(0), m.TsMsDelta()) } -- cgit v1.2.3