aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/messages/message_pipeline.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-25 17:57:26 -0700
committerchrislu <chris.lu@gmail.com>2022-09-25 17:57:26 -0700
commit24317a97636546db80faf66c1653eab1e1fc2b2a (patch)
treef67fe074353b5c94d0af3dd1198a6d211aeae3b0 /weed/mq/messages/message_pipeline.go
parent3da8d7309a2b46861475fb685eb21c7179bf5c44 (diff)
downloadseaweedfs-24317a97636546db80faf66c1653eab1e1fc2b2a.tar.xz
seaweedfs-24317a97636546db80faf66c1653eab1e1fc2b2a.zip
async wait
Diffstat (limited to 'weed/mq/messages/message_pipeline.go')
-rw-r--r--weed/mq/messages/message_pipeline.go14
1 files changed, 8 insertions, 6 deletions
diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go
index d8e3a85b8..be967b32e 100644
--- a/weed/mq/messages/message_pipeline.go
+++ b/weed/mq/messages/message_pipeline.go
@@ -25,7 +25,7 @@ type MessagePipeline struct {
movedBuffersChan chan MessageBufferReference
onMessageFn OnMessageFunc
mover MessageBufferMover
- moverPool *util.LimitedConcurrentExecutor
+ moverPool *util.LimitedAsyncExecutor
// control pipeline
doneChan chan struct{}
@@ -47,15 +47,15 @@ func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeou
doneChan: make(chan struct{}),
batchSize: batchSize,
timeout: timeout,
- moverPool: util.NewLimitedConcurrentExecutor(workerCount),
+ moverPool: util.NewLimitedAsyncExecutor(workerCount),
mover: mover,
}
go t.doLoopUpload()
return t
}
-func (mp *MessagePipeline) OutputChan() chan MessageBufferReference {
- return mp.movedBuffersChan
+func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference {
+ return mp.moverPool.NextFuture().Await().(MessageBufferReference)
}
func (mp *MessagePipeline) AddMessage(message *Message) {
@@ -105,18 +105,20 @@ func (mp *MessagePipeline) doLoopUpload() {
select {
case messageBuffer := <-mp.sealedBuffersChan:
ticker.Reset(mp.timeout)
- mp.moverPool.Execute(func() {
+ mp.moverPool.Execute(func() any {
+ var output MessageBufferReference
util.RetryForever("message mover", func() error {
if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil {
return flushErr
} else {
- mp.movedBuffersChan <- messageReference
+ output = messageReference
}
return nil
}, func(err error) (shouldContinue bool) {
log.Printf("failed: %v", err)
return true
})
+ return output
})
case <-ticker.C:
if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 {