diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer_flush_gap_test.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer_flush_gap_test.go | 43 |
1 files changed, 30 insertions, 13 deletions
diff --git a/weed/util/log_buffer/log_buffer_flush_gap_test.go b/weed/util/log_buffer/log_buffer_flush_gap_test.go index bc40ea6df..dc010f1b8 100644 --- a/weed/util/log_buffer/log_buffer_flush_gap_test.go +++ b/weed/util/log_buffer/log_buffer_flush_gap_test.go @@ -69,11 +69,13 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { t.Logf("Sending %d messages...", messageCount) for i := 0; i < messageCount; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(fmt.Sprintf("message-%d", i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Force flush multiple times to simulate real workload @@ -82,11 +84,13 @@ func TestFlushOffsetGap_ReproduceDataLoss(t *testing.T) { // Add more messages after flush for i := messageCount; i < messageCount+50; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(fmt.Sprintf("message-%d", i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Force another flush @@ -209,11 +213,13 @@ func TestFlushOffsetGap_CheckPrevBuffers(t *testing.T) { // Send 20 messages for i := 0; i < 20; i++ { offset := int64(batch*20 + i) - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", offset)), Value: []byte(fmt.Sprintf("message-%d", offset)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Check state before flush @@ -285,11 +291,14 @@ func TestFlushOffsetGap_ConcurrentWriteAndFlush(t *testing.T) { go func() { defer wg.Done() for i := 0; i < 200; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("key-%d", i)), Value: []byte(fmt.Sprintf("message-%d", i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Errorf("Failed to add buffer: %v", err) + return + } if i%50 == 0 { time.Sleep(10 * time.Millisecond) } @@ -389,7 +398,9 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { TsNs: time.Now().UnixNano(), Offset: nextKafkaOffset, // Explicit Kafka offset } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } nextKafkaOffset++ } @@ -422,7 +433,9 @@ func TestFlushOffsetGap_ProductionScenario(t *testing.T) { TsNs: time.Now().UnixNano(), Offset: nextKafkaOffset, } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } nextKafkaOffset++ } @@ -546,7 +559,9 @@ func TestFlushOffsetGap_ConcurrentReadDuringFlush(t *testing.T) { TsNs: time.Now().UnixNano(), Offset: i, } - logBuffer.AddLogEntryToBuffer(logEntry) + if err := logBuffer.AddLogEntryToBuffer(logEntry); err != nil { + t.Fatalf("Failed to add log entry: %v", err) + } } // Flush (moves data to disk) @@ -616,11 +631,13 @@ func TestFlushOffsetGap_ForceFlushAdvancesBuffer(t *testing.T) { // Add 10 messages for i := 0; i < 10; i++ { - logBuffer.AddToBuffer(&mq_pb.DataMessage{ + if err := logBuffer.AddToBuffer(&mq_pb.DataMessage{ Key: []byte(fmt.Sprintf("round-%d-msg-%d", round, i)), Value: []byte(fmt.Sprintf("data-%d-%d", round, i)), TsNs: time.Now().UnixNano(), - }) + }); err != nil { + t.Fatalf("Failed to add buffer: %v", err) + } } // Check state after adding |
