aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/publish_stream_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/publish_stream_processor.go')
-rw-r--r--weed/mq/client/publish_stream_processor.go4
1 files changed, 2 insertions, 2 deletions
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go
index f83bcd08b..7aefa6b86 100644
--- a/weed/mq/client/publish_stream_processor.go
+++ b/weed/mq/client/publish_stream_processor.go
@@ -80,12 +80,12 @@ func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMe
}
builder := <-p.builders
- bb := segment.NewMessageBatchBuilder(builder, p.ProducerId, p.ProducerEpoch, 3, 4)
+ bb := segment.NewMessageBatchBuilder(builder)
for _, m := range messages {
bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content)
p.messagesSequence++
}
- bb.BuildMessageBatch()
+ bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4)
defer func() {
p.builders <- builder
}()