diff options
Diffstat (limited to 'weed/mq/segment')
| -rw-r--r-- | weed/mq/segment/message_serde.go | 113 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde_test.go | 41 |
2 files changed, 112 insertions, 42 deletions
diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go index b69d78cab..8efa9fe21 100644 --- a/weed/mq/segment/message_serde.go +++ b/weed/mq/segment/message_serde.go @@ -5,44 +5,105 @@ import ( flatbuffers "github.com/google/flatbuffers/go" ) -func CreateMessage(b *flatbuffers.Builder, producerId int32, producerSeq int64, segmentId int32, segmentSeq int64, - eventTsNs int64, recvTsNs int64, properties map[string]string, key []byte, value []byte) { +type MessageBatchBuilder struct { + b *flatbuffers.Builder + producerId int32 + producerEpoch int32 + segmentId int32 + flags int32 + messageOffsets []flatbuffers.UOffsetT + segmentSeqBase int64 + segmentSeqLast int64 + tsMsBase int64 + tsMsLast int64 +} + +func NewMessageBatchBuilder(b *flatbuffers.Builder, + producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) *MessageBatchBuilder { + b.Reset() + return &MessageBatchBuilder{ + b: b, + producerId: producerId, + producerEpoch: producerEpoch, + segmentId: segmentId, + flags: flags, + } +} + +func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) { + if builder.segmentSeqBase == 0 { + builder.segmentSeqBase = segmentSeq + } + builder.segmentSeqLast = segmentSeq + if builder.tsMsBase == 0 { + builder.tsMsBase = tsMs + } + builder.tsMsLast = tsMs + var names, values, pairs []flatbuffers.UOffsetT for k, v := range properties { - names = append(names, b.CreateString(k)) - values = append(values, b.CreateString(v)) + names = append(names, builder.b.CreateString(k)) + values = append(values, builder.b.CreateByteVector(v)) } - for i, _ := range names { - message_fbs.NameValueStart(b) - message_fbs.NameValueAddName(b, names[i]) - message_fbs.NameValueAddValue(b, values[i]) - pair := message_fbs.NameValueEnd(b) + message_fbs.NameValueStart(builder.b) + message_fbs.NameValueAddName(builder.b, names[i]) + message_fbs.NameValueAddValue(builder.b, values[i]) + pair := message_fbs.NameValueEnd(builder.b) pairs = append(pairs, pair) } - message_fbs.MessageStartPropertiesVector(b, len(properties)) + + message_fbs.MessageStartPropertiesVector(builder.b, len(properties)) for i := len(pairs) - 1; i >= 0; i-- { - b.PrependUOffsetT(pairs[i]) + builder.b.PrependUOffsetT(pairs[i]) } - prop := b.EndVector(len(properties)) + propOffset := builder.b.EndVector(len(properties)) - k := b.CreateByteVector(key) - v := b.CreateByteVector(value) + keyOffset := builder.b.CreateByteVector(key) + valueOffset := builder.b.CreateByteVector(value) - message_fbs.MessageStart(b) - message_fbs.MessageAddProducerId(b, producerId) - message_fbs.MessageAddProducerSeq(b, producerSeq) - message_fbs.MessageAddSegmentId(b, segmentId) - message_fbs.MessageAddSegmentSeq(b, segmentSeq) - message_fbs.MessageAddEventTsNs(b, eventTsNs) - message_fbs.MessageAddRecvTsNs(b, recvTsNs) + message_fbs.MessageStart(builder.b) + message_fbs.MessageAddSeqDelta(builder.b, int32(segmentSeq-builder.segmentSeqBase)) + message_fbs.MessageAddTsMsDelta(builder.b, int32(tsMs-builder.tsMsBase)) - message_fbs.MessageAddProperties(b, prop) - message_fbs.MessageAddKey(b, k) - message_fbs.MessageAddData(b, v) - message := message_fbs.MessageEnd(b) + message_fbs.MessageAddProperties(builder.b, propOffset) + message_fbs.MessageAddKey(builder.b, keyOffset) + message_fbs.MessageAddData(builder.b, valueOffset) + messageOffset := message_fbs.MessageEnd(builder.b) + + builder.messageOffsets = append(builder.messageOffsets, messageOffset) + +} + +func (builder *MessageBatchBuilder) BuildMessageBatch() { + message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets)) + for i := len(builder.messageOffsets) - 1; i >= 0; i-- { + builder.b.PrependUOffsetT(builder.messageOffsets[i]) + } + messagesOffset := builder.b.EndVector(len(builder.messageOffsets)) + + message_fbs.MessageBatchStart(builder.b) + message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId) + message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch) + message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId) + message_fbs.MessageBatchAddFlags(builder.b, builder.flags) + message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase) + message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase)) + message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase) + message_fbs.MessageBatchAddTsMsMaxDelta(builder.b, int32(builder.tsMsLast-builder.tsMsBase)) + + message_fbs.MessageBatchAddMessages(builder.b, messagesOffset) + + messageBatch := message_fbs.MessageBatchEnd(builder.b) + + builder.b.Finish(messageBatch) +} - b.Finish(message) +func (builder *MessageBatchBuilder) GetBytes() []byte { + return builder.b.FinishedBytes() } diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go index 7ba0febf0..bf561d2c0 100644 --- a/weed/mq/segment/message_serde_test.go +++ b/weed/mq/segment/message_serde_test.go @@ -10,27 +10,36 @@ import ( func TestMessageSerde(t *testing.T) { b := flatbuffers.NewBuilder(1024) - prop := make(map[string]string) - prop["n1"] = "v1" - prop["n2"] = "v2" + prop := make(map[string][]byte) + prop["n1"] = []byte("v1") + prop["n2"] = []byte("v2") - CreateMessage(b, 1, 2, 3, 4, 5, 6, prop, - []byte("the primary key"), []byte("body is here")) + bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) - buf := b.FinishedBytes() + bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) + + bb.BuildMessageBatch() + + buf := bb.GetBytes() println("serialized size", len(buf)) - m := message_fbs.GetRootAsMessage(buf, 0) + mb := message_fbs.GetRootAsMessageBatch(buf, 0) + + assert.Equal(t, int32(1), mb.ProducerId()) + assert.Equal(t, int32(2), mb.ProducerEpoch()) + assert.Equal(t, int32(3), mb.SegmentId()) + assert.Equal(t, int32(4), mb.Flags()) + assert.Equal(t, int64(5), mb.SegmentSeqBase()) + assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta()) + assert.Equal(t, int64(6), mb.TsMsBase()) + assert.Equal(t, int32(0), mb.TsMsMaxDelta()) + + assert.Equal(t, 1, mb.MessagesLength()) - assert.Equal(t, int32(1), m.ProducerId()) - assert.Equal(t, int64(2), m.ProducerSeq()) - assert.Equal(t, int32(3), m.SegmentId()) - assert.Equal(t, int64(4), m.SegmentSeq()) - assert.Equal(t, int64(5), m.EventTsNs()) - assert.Equal(t, int64(6), m.RecvTsNs()) + m := &message_fbs.Message{} + mb.Messages(m, 0) - assert.Equal(t, 2, m.PropertiesLength()) nv := &message_fbs.NameValue{} m.Properties(nv, 0) assert.Equal(t, "n1", string(nv.Name())) @@ -41,7 +50,7 @@ func TestMessageSerde(t *testing.T) { assert.Equal(t, []byte("the primary key"), m.Key()) assert.Equal(t, []byte("body is here"), m.Data()) - m.MutateSegmentSeq(123) - assert.Equal(t, int64(123), m.SegmentSeq()) + assert.Equal(t, int32(0), m.SeqDelta()) + assert.Equal(t, int32(0), m.TsMsDelta()) } |
