diff options
Diffstat (limited to 'weed/mq/segment/message_serde.go')
| -rw-r--r-- | weed/mq/segment/message_serde.go | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/weed/mq/segment/message_serde.go b/weed/mq/segment/message_serde.go index ee55b18a7..bb979a2a8 100644 --- a/weed/mq/segment/message_serde.go +++ b/weed/mq/segment/message_serde.go @@ -7,10 +7,6 @@ import ( type MessageBatchBuilder struct { b *flatbuffers.Builder - producerId int32 - producerEpoch int32 - segmentId int32 - flags int32 messageOffsets []flatbuffers.UOffsetT segmentSeqBase int64 segmentSeqLast int64 @@ -18,23 +14,19 @@ type MessageBatchBuilder struct { tsMsLast int64 } -func NewMessageBatchBuilder(b *flatbuffers.Builder, - producerId int32, - producerEpoch int32, - segmentId int32, - flags int32) *MessageBatchBuilder { +func NewMessageBatchBuilder(b *flatbuffers.Builder) *MessageBatchBuilder { b.Reset() return &MessageBatchBuilder{ - b: b, - producerId: producerId, - producerEpoch: producerEpoch, - segmentId: segmentId, - flags: flags, + b: b, } } +func (builder *MessageBatchBuilder) Reset() { + builder.b.Reset() +} + func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string]string, key []byte, value []byte) { if builder.segmentSeqBase == 0 { builder.segmentSeqBase = segmentSeq @@ -80,7 +72,10 @@ func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, pro } -func (builder *MessageBatchBuilder) BuildMessageBatch() { +func (builder *MessageBatchBuilder) BuildMessageBatch(producerId int32, + producerEpoch int32, + segmentId int32, + flags int32) { message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets)) for i := len(builder.messageOffsets) - 1; i >= 0; i-- { builder.b.PrependUOffsetT(builder.messageOffsets[i]) @@ -88,10 +83,10 @@ func (builder *MessageBatchBuilder) BuildMessageBatch() { messagesOffset := builder.b.EndVector(len(builder.messageOffsets)) message_fbs.MessageBatchStart(builder.b) - message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId) - message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch) - message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId) - message_fbs.MessageBatchAddFlags(builder.b, builder.flags) + message_fbs.MessageBatchAddProducerId(builder.b, producerId) + message_fbs.MessageBatchAddProducerEpoch(builder.b, producerEpoch) + message_fbs.MessageBatchAddSegmentId(builder.b, segmentId) + message_fbs.MessageBatchAddFlags(builder.b, flags) message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase) message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase)) message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase) |
