diff options
Diffstat (limited to 'weed/mq/messages/message_pipeline.go')
| -rw-r--r-- | weed/mq/messages/message_pipeline.go | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go index d8e3a85b8..be967b32e 100644 --- a/weed/mq/messages/message_pipeline.go +++ b/weed/mq/messages/message_pipeline.go @@ -25,7 +25,7 @@ type MessagePipeline struct { movedBuffersChan chan MessageBufferReference onMessageFn OnMessageFunc mover MessageBufferMover - moverPool *util.LimitedConcurrentExecutor + moverPool *util.LimitedAsyncExecutor // control pipeline doneChan chan struct{} @@ -47,15 +47,15 @@ func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeou doneChan: make(chan struct{}), batchSize: batchSize, timeout: timeout, - moverPool: util.NewLimitedConcurrentExecutor(workerCount), + moverPool: util.NewLimitedAsyncExecutor(workerCount), mover: mover, } go t.doLoopUpload() return t } -func (mp *MessagePipeline) OutputChan() chan MessageBufferReference { - return mp.movedBuffersChan +func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference { + return mp.moverPool.NextFuture().Await().(MessageBufferReference) } func (mp *MessagePipeline) AddMessage(message *Message) { @@ -105,18 +105,20 @@ func (mp *MessagePipeline) doLoopUpload() { select { case messageBuffer := <-mp.sealedBuffersChan: ticker.Reset(mp.timeout) - mp.moverPool.Execute(func() { + mp.moverPool.Execute(func() any { + var output MessageBufferReference util.RetryForever("message mover", func() error { if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil { return flushErr } else { - mp.movedBuffersChan <- messageReference + output = messageReference } return nil }, func(err error) (shouldContinue bool) { log.Printf("failed: %v", err) return true }) + return output }) case <-ticker.C: if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 { |
