aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-09-25 17:57:26 -0700
committerchrislu <chris.lu@gmail.com>2022-09-25 17:57:26 -0700
commit24317a97636546db80faf66c1653eab1e1fc2b2a (patch)
treef67fe074353b5c94d0af3dd1198a6d211aeae3b0
parent3da8d7309a2b46861475fb685eb21c7179bf5c44 (diff)
downloadseaweedfs-24317a97636546db80faf66c1653eab1e1fc2b2a.tar.xz
seaweedfs-24317a97636546db80faf66c1653eab1e1fc2b2a.zip
async wait
-rw-r--r--weed/mq/messages/message_pipeline.go14
-rw-r--r--weed/mq/messages/message_pipeline_test.go30
2 files changed, 35 insertions, 9 deletions
diff --git a/weed/mq/messages/message_pipeline.go b/weed/mq/messages/message_pipeline.go
index d8e3a85b8..be967b32e 100644
--- a/weed/mq/messages/message_pipeline.go
+++ b/weed/mq/messages/message_pipeline.go
@@ -25,7 +25,7 @@ type MessagePipeline struct {
movedBuffersChan chan MessageBufferReference
onMessageFn OnMessageFunc
mover MessageBufferMover
- moverPool *util.LimitedConcurrentExecutor
+ moverPool *util.LimitedAsyncExecutor
// control pipeline
doneChan chan struct{}
@@ -47,15 +47,15 @@ func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeou
doneChan: make(chan struct{}),
batchSize: batchSize,
timeout: timeout,
- moverPool: util.NewLimitedConcurrentExecutor(workerCount),
+ moverPool: util.NewLimitedAsyncExecutor(workerCount),
mover: mover,
}
go t.doLoopUpload()
return t
}
-func (mp *MessagePipeline) OutputChan() chan MessageBufferReference {
- return mp.movedBuffersChan
+func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference {
+ return mp.moverPool.NextFuture().Await().(MessageBufferReference)
}
func (mp *MessagePipeline) AddMessage(message *Message) {
@@ -105,18 +105,20 @@ func (mp *MessagePipeline) doLoopUpload() {
select {
case messageBuffer := <-mp.sealedBuffersChan:
ticker.Reset(mp.timeout)
- mp.moverPool.Execute(func() {
+ mp.moverPool.Execute(func() any {
+ var output MessageBufferReference
util.RetryForever("message mover", func() error {
if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil {
return flushErr
} else {
- mp.movedBuffersChan <- messageReference
+ output = messageReference
}
return nil
}, func(err error) (shouldContinue bool) {
log.Printf("failed: %v", err)
return true
})
+ return output
})
case <-ticker.C:
if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 {
diff --git a/weed/mq/messages/message_pipeline_test.go b/weed/mq/messages/message_pipeline_test.go
index aaa25e0fd..e7ecfa8f6 100644
--- a/weed/mq/messages/message_pipeline_test.go
+++ b/weed/mq/messages/message_pipeline_test.go
@@ -5,11 +5,11 @@ import (
"time"
)
-func TestAddMessage(t *testing.T) {
+func TestAddMessage1(t *testing.T) {
mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{})
go func() {
- outChan := mp.OutputChan()
- for mr := range outChan {
+ for i := 0; i < 100; i++ {
+ mr := mp.NextMessageBufferReference()
println(mr.sequence, mr.fileId)
}
}()
@@ -26,4 +26,28 @@ func TestAddMessage(t *testing.T) {
mp.ShutdownStart()
mp.ShutdownWait()
+
+}
+
+func TestAddMessage2(t *testing.T) {
+ mp := NewMessagePipeline(0, 3, 10, time.Second, &EmptyMover{})
+
+ for i := 0; i < 100; i++ {
+ message := &Message{
+ Key: []byte("key"),
+ Content: []byte("data"),
+ Properties: nil,
+ Ts: time.Now(),
+ }
+ mp.AddMessage(message)
+ }
+
+ mp.ShutdownStart()
+ mp.ShutdownWait()
+
+ for i := 0; i < 100; i++ {
+ mr := mp.NextMessageBufferReference()
+ println(mr.sequence, mr.fileId)
+ }
+
}