diff options
Diffstat (limited to 'weed/mq/client/pub_client/publish.go')
| -rw-r--r-- | weed/mq/client/pub_client/publish.go | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go index 1c5891049..fbb07b042 100644 --- a/weed/mq/client/pub_client/publish.go +++ b/weed/mq/client/pub_client/publish.go @@ -5,6 +5,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" + "time" ) func (p *TopicPublisher) Publish(key, value []byte) error { @@ -20,5 +21,21 @@ func (p *TopicPublisher) Publish(key, value []byte) error { return inputBuffer.Enqueue(&mq_pb.DataMessage{ Key: key, Value: value, + TsNs: time.Now().UnixNano(), }) } + +func (p *TopicPublisher) FinishPublish() error { + if inputBuffers, found := p.partition2Buffer.AllIntersections(0, pub_balancer.MaxPartitionCount); found { + for _, inputBuffer := range inputBuffers { + inputBuffer.Enqueue(&mq_pb.DataMessage{ + TsNs: time.Now().UnixNano(), + Ctrl: &mq_pb.ControlMessage{ + IsClose: true, + }, + }) + } + } + + return nil +} |
