aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer_flush_gap_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_buffer_flush_gap_test.go')
-rw-r--r--weed/util/log_buffer/log_buffer_flush_gap_test.go43
1 files changed, 30 insertions, 13 deletions
diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go
index bc40ea6df..dc010f1b8 100644
--- a/weed/util/log_buffer/log_buffer_flush_gap_test.go
+++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go
@@ -69,11 +69,13 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) {
t.Logf("Sending %d messages...", messageCount)
for i := 0; i < messageCount; i++ {
- logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("message-%d", i)),
TsNs: time.Now().UnixNano(),
- })
+ }); err != nil {
+ t.Fatalf("Failed to add buffer: %v", err)
+ }
}
// Force flush multiple times to simulate real workload
@@ -82,11 +84,13 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) {
// Add more messages after flush
for i := messageCount; i < messageCount+50; i++ {
- logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("message-%d", i)),
TsNs: time.Now().UnixNano(),
- })
+ }); err != nil {
+ t.Fatalf("Failed to add buffer: %v", err)
+ }
}
// Force another flush
@@ -209,11 +213,13 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) {
// Send 20 messages
for i := 0; i < 20; i++ {
offset := int64(batch*20 + i)
- logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", offset)),
Value: []byte(fmt.Sprintf("message-%d", offset)),
TsNs: time.Now().UnixNano(),
- })
+ }); err != nil {
+ t.Fatalf("Failed to add buffer: %v", err)
+ }
}
// Check state before flush
@@ -285,11 +291,14 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) {
go func() {
defer wg.Done()
for i := 0; i < 200; i++ {
- logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(fmt.Sprintf("message-%d", i)),
TsNs: time.Now().UnixNano(),
- })
+ }); err != nil {
+ t.Errorf("Failed to add buffer: %v", err)
+ return
+ }
if i%50 == 0 {
time.Sleep(10 * time.Millisecond)
}
@@ -389,7 +398,9 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
TsNs: time.Now().UnixNano(),
Offset: nextKafkaOffset, // Explicit Kafka offset
}
- logBuffer.AddLogEntryToBuffer(logEntry)
+ if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil {
+ t.Fatalf("Failed to add log entry: %v", err)
+ }
nextKafkaOffset++
}
@@ -422,7 +433,9 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) {
TsNs: time.Now().UnixNano(),
Offset: nextKafkaOffset,
}
- logBuffer.AddLogEntryToBuffer(logEntry)
+ if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil {
+ t.Fatalf("Failed to add log entry: %v", err)
+ }
nextKafkaOffset++
}
@@ -546,7 +559,9 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) {
TsNs: time.Now().UnixNano(),
Offset: i,
}
- logBuffer.AddLogEntryToBuffer(logEntry)
+ if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil {
+ t.Fatalf("Failed to add log entry: %v", err)
+ }
}
// Flush (moves data to disk)
@@ -616,11 +631,13 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) {
// Add 10 messages
for i := 0; i < 10; i++ {
- logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("round-%d-msg-%d", round, i)),
Value: []byte(fmt.Sprintf("data-%d-%d", round, i)),
TsNs: time.Now().UnixNano(),
- })
+ }); err != nil {
+ t.Fatalf("Failed to add buffer: %v", err)
+ }
}
// Check state after adding