diff options
| author | chrislu <chris.lu@gmail.com> | 2022-09-25 17:57:26 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-09-25 17:57:26 -0700 |
| commit | 24317a97636546db80faf66c1653eab1e1fc2b2a (patch) | |
| tree | f67fe074353b5c94d0af3dd1198a6d211aeae3b0 /weed/mq/messages/message_pipeline.go | |
| parent | 3da8d7309a2b46861475fb685eb21c7179bf5c44 (diff) | |
| download | seaweedfs-24317a97636546db80faf66c1653eab1e1fc2b2a.tar.xz seaweedfs-24317a97636546db80faf66c1653eab1e1fc2b2a.zip | |
async wait
Diffstat (limited to 'weed/mq/messages/message_pipeline.go')
| -rw-r--r-- | weed/mq/messages/message_pipeline.go | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go index d8e3a85b8..be967b32e 100644 --- a/weed/mq/messages/message_pipeline.go +++ b/weed/mq/messages/message_pipeline.go @@ -25,7 +25,7 @@ type MessagePipeline struct { movedBuffersChan chan MessageBufferReference onMessageFn OnMessageFunc mover MessageBufferMover - moverPool *util.LimitedConcurrentExecutor + moverPool *util.LimitedAsyncExecutor // control pipeline doneChan chan struct{} @@ -47,15 +47,15 @@ func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeou doneChan: make(chan struct{}), batchSize: batchSize, timeout: timeout, - moverPool: util.NewLimitedConcurrentExecutor(workerCount), + moverPool: util.NewLimitedAsyncExecutor(workerCount), mover: mover, } go t.doLoopUpload() return t } -func (mp *MessagePipeline) OutputChan() chan MessageBufferReference { - return mp.movedBuffersChan +func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference { + return mp.moverPool.NextFuture().Await().(MessageBufferReference) } func (mp *MessagePipeline) AddMessage(message *Message) { @@ -105,18 +105,20 @@ func (mp *MessagePipeline) doLoopUpload() { select { case messageBuffer := <-mp.sealedBuffersChan: ticker.Reset(mp.timeout) - mp.moverPool.Execute(func() { + mp.moverPool.Execute(func() any { + var output MessageBufferReference util.RetryForever("message mover", func() error { if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil { return flushErr } else { - mp.movedBuffersChan <- messageReference + output = messageReference } return nil }, func(err error) (shouldContinue bool) { log.Printf("failed: %v", err) return true }) + return output }) case <-ticker.C: if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 { |
