aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/publish_stream_processor.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-25 11:19:05 -0700
committerchrislu <chris.lu@gmail.com>2022-09-25 11:19:05 -0700
commit600d2f92a44b8fb76653ef0547bce5a406e82eb8 (patch)
tree7e45cf7c4e38540728e1049ccf21fbaec3675fa7 /weed/mq/client/publish_stream_processor.go
parent15b6c7a6f0adad7d0638b770c082d73f8a612c89 (diff)
downloadseaweedfs-600d2f92a44b8fb76653ef0547bce5a406e82eb8.tar.xz
seaweedfs-600d2f92a44b8fb76653ef0547bce5a406e82eb8.zip
add message pipeline
Diffstat (limited to 'weed/mq/client/publish_stream_processor.go')
-rw-r--r--weed/mq/client/publish_stream_processor.go4
1 files changed, 2 insertions, 2 deletions
diff --git a/weed/mq/client/publish_stream_processor.go b/weed/mq/client/publish_stream_processor.go
index f83bcd08b..7aefa6b86 100644
--- a/weed/mq/client/publish_stream_processor.go
+++ b/weed/mq/client/publish_stream_processor.go
@@ -80,12 +80,12 @@ func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMe
}
builder := <-p.builders
- bb := segment.NewMessageBatchBuilder(builder, p.ProducerId, p.ProducerEpoch, 3, 4)
+ bb := segment.NewMessageBatchBuilder(builder)
for _, m := range messages {
bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content)
p.messagesSequence++
}
- bb.BuildMessageBatch()
+ bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4)
defer func() {
p.builders <- builder
}()