aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/segment
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/segment')
-rw-r--r--weed/mq/segment/message_serde.go37
-rw-r--r--weed/mq/segment/message_serde_test.go10
2 files changed, 21 insertions, 26 deletions
diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go
index 66a76c57d..bb979a2a8 100644
--- a/weed/mq/segment/message_serde.go
+++ b/weed/mq/segment/message_serde.go
@@ -7,10 +7,6 @@ import (
type MessageBatchBuilder struct {
b *flatbuffers.Builder
- producerId int32
- producerEpoch int32
- segmentId int32
- flags int32
messageOffsets []flatbuffers.UOffsetT
segmentSeqBase int64
segmentSeqLast int64
@@ -18,24 +14,20 @@ type MessageBatchBuilder struct {
tsMsLast int64
}
-func NewMessageBatchBuilder(b *flatbuffers.Builder,
- producerId int32,
- producerEpoch int32,
- segmentId int32,
- flags int32) *MessageBatchBuilder {
+func NewMessageBatchBuilder(b *flatbuffers.Builder) *MessageBatchBuilder {
b.Reset()
return &MessageBatchBuilder{
- b: b,
- producerId: producerId,
- producerEpoch: producerEpoch,
- segmentId: segmentId,
- flags: flags,
+ b: b,
}
}
-func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) {
+func (builder *MessageBatchBuilder) Reset() {
+ builder.b.Reset()
+}
+
+func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string]string, key []byte, value []byte) {
if builder.segmentSeqBase == 0 {
builder.segmentSeqBase = segmentSeq
}
@@ -48,7 +40,7 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro
var names, values, pairs []flatbuffers.UOffsetT
for k, v := range properties {
names = append(names, builder.b.CreateString(k))
- values = append(values, builder.b.CreateByteVector(v))
+ values = append(values, builder.b.CreateString(v))
}
for i, _ := range names {
message_fbs.NameValueStart(builder.b)
@@ -80,7 +72,10 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro
}
-func (builder *MessageBatchBuilder) BuildMessageBatch() {
+func (builder *MessageBatchBuilder) BuildMessageBatch(producerId int32,
+ producerEpoch int32,
+ segmentId int32,
+ flags int32) {
message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets))
for i := len(builder.messageOffsets) - 1; i >= 0; i-- {
builder.b.PrependUOffsetT(builder.messageOffsets[i])
@@ -88,10 +83,10 @@ func (builder *MessageBatchBuilder) BuildMessageBatch() {
messagesOffset := builder.b.EndVector(len(builder.messageOffsets))
message_fbs.MessageBatchStart(builder.b)
- message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId)
- message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch)
- message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId)
- message_fbs.MessageBatchAddFlags(builder.b, builder.flags)
+ message_fbs.MessageBatchAddProducerId(builder.b, producerId)
+ message_fbs.MessageBatchAddProducerEpoch(builder.b, producerEpoch)
+ message_fbs.MessageBatchAddSegmentId(builder.b, segmentId)
+ message_fbs.MessageBatchAddFlags(builder.b, flags)
message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase)
message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase))
message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase)
diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go
index c65bffb84..8849b393b 100644
--- a/weed/mq/segment/message_serde_test.go
+++ b/weed/mq/segment/message_serde_test.go
@@ -10,15 +10,15 @@ import (
func TestMessageSerde(t *testing.T) {
b := flatbuffers.NewBuilder(1024)
- prop := make(map[string][]byte)
- prop["n1"] = []byte("v1")
- prop["n2"] = []byte("v2")
+ prop := make(map[string]string)
+ prop["n1"] = "v1"
+ prop["n2"] = "v2"
- bb := NewMessageBatchBuilder(b, 1, 2, 3, 4)
+ bb := NewMessageBatchBuilder(b)
bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here"))
- bb.BuildMessageBatch()
+ bb.BuildMessageBatch(1, 2, 3, 4)
buf := bb.GetBytes()