aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/segment/message_serde.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/segment/message_serde.go')
-rw-r--r--weed/mq/segment/message_serde.go33
1 files changed, 14 insertions, 19 deletions
diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go
index ee55b18a7..bb979a2a8 100644
--- a/weed/mq/segment/message_serde.go
+++ b/weed/mq/segment/message_serde.go
@@ -7,10 +7,6 @@ import (
type MessageBatchBuilder struct {
b *flatbuffers.Builder
- producerId int32
- producerEpoch int32
- segmentId int32
- flags int32
messageOffsets []flatbuffers.UOffsetT
segmentSeqBase int64
segmentSeqLast int64
@@ -18,23 +14,19 @@ type MessageBatchBuilder struct {
tsMsLast int64
}
-func NewMessageBatchBuilder(b *flatbuffers.Builder,
- producerId int32,
- producerEpoch int32,
- segmentId int32,
- flags int32) *MessageBatchBuilder {
+func NewMessageBatchBuilder(b *flatbuffers.Builder) *MessageBatchBuilder {
b.Reset()
return &MessageBatchBuilder{
- b: b,
- producerId: producerId,
- producerEpoch: producerEpoch,
- segmentId: segmentId,
- flags: flags,
+ b: b,
}
}
+func (builder *MessageBatchBuilder) Reset() {
+ builder.b.Reset()
+}
+
func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string]string, key []byte, value []byte) {
if builder.segmentSeqBase == 0 {
builder.segmentSeqBase = segmentSeq
@@ -80,7 +72,10 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro
}
-func (builder *MessageBatchBuilder) BuildMessageBatch() {
+func (builder *MessageBatchBuilder) BuildMessageBatch(producerId int32,
+ producerEpoch int32,
+ segmentId int32,
+ flags int32) {
message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets))
for i := len(builder.messageOffsets) - 1; i >= 0; i-- {
builder.b.PrependUOffsetT(builder.messageOffsets[i])
@@ -88,10 +83,10 @@ func (builder *MessageBatchBuilder) BuildMessageBatch() {
messagesOffset := builder.b.EndVector(len(builder.messageOffsets))
message_fbs.MessageBatchStart(builder.b)
- message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId)
- message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch)
- message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId)
- message_fbs.MessageBatchAddFlags(builder.b, builder.flags)
+ message_fbs.MessageBatchAddProducerId(builder.b, producerId)
+ message_fbs.MessageBatchAddProducerEpoch(builder.b, producerEpoch)
+ message_fbs.MessageBatchAddSegmentId(builder.b, segmentId)
+ message_fbs.MessageBatchAddFlags(builder.b, flags)
message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase)
message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase))
message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase)