aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-08-20 12:46:15 -0700
committerchrislu <chris.lu@gmail.com>2023-08-20 12:46:15 -0700
commit16e3f2d5289ea27f7bcb4f71d3685393529ccd07 (patch)
tree4b4eae085607f55d1c1c6208ba5989dbf3f98e72
parent3e9c32a3f0e20bb1663aa3326c4e1d50f44f66c4 (diff)
downloadseaweedfs-16e3f2d5289ea27f7bcb4f71d3685393529ccd07.tar.xz
seaweedfs-16e3f2d5289ea27f7bcb4f71d3685393529ccd07.zip
fix log buffer test
-rw-r--r--weed/util/log_buffer/log_buffer_test.go48
1 files changed, 33 insertions, 15 deletions
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index 2cc1c89b4..7aad0afab 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -1,8 +1,10 @@
package log_buffer
import (
+ "crypto/rand"
"fmt"
- "math/rand"
+ "io"
+ "sync"
"testing"
"time"
@@ -10,33 +12,49 @@ import (
)
func TestNewLogBufferFirstBuffer(t *testing.T) {
- lb := NewLogBuffer("test", time.Minute, func(startTime, stopTime time.Time, buf []byte) {
-
+ flushInterval := time.Second
+ lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) {
+ fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
}, func() {
-
})
startTime := time.Now()
messageSize := 1024
messageCount := 5000
+
+ receivedmessageCount := 0
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
+ // stop if no more messages
+ return true
+ }, func(logEntry *filer_pb.LogEntry) error {
+ receivedmessageCount++
+ if receivedmessageCount >= messageCount {
+ println("processed all messages")
+ return io.EOF
+ }
+ return nil
+ })
+
+ fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedmessageCount)
+ fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
+ if err != nil && err != io.EOF {
+ t.Errorf("unexpected error %v", err)
+ }
+ }()
+
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
lb.AddToBuffer(nil, buf, 0)
}
-
- receivedmessageCount := 0
- lb.LoopProcessLogData("test", startTime, 0, func() bool {
- // stop if no more messages
- return false
- }, func(logEntry *filer_pb.LogEntry) error {
- receivedmessageCount++
- return nil
- })
+ wg.Wait()
if receivedmessageCount != messageCount {
- fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount)
+ t.Errorf("expect %d messages, but got %d", messageCount, receivedmessageCount)
}
-
}