aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/publish.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/publish.go')
-rw-r--r--weed/mq/client/pub_client/publish.go17
1 files changed, 17 insertions, 0 deletions
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 1c5891049..fbb07b042 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "time"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
@@ -20,5 +21,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
return inputBuffer.Enqueue(&mq_pb.DataMessage{
Key: key,
Value: value,
+ TsNs: time.Now().UnixNano(),
})
}
+
+func (p *TopicPublisher) FinishPublish() error {
+ if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
+ for _, inputBuffer := range inputBuffers {
+ inputBuffer.Enqueue(&mq_pb.DataMessage{
+ TsNs: time.Now().UnixNano(),
+ Ctrl: &mq_pb.ControlMessage{
+ IsClose: true,
+ },
+ })
+ }
+ }
+
+ return nil
+}