aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read_stateless_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_read_stateless_test.go')
-rw-r--r--weed/util/log_buffer/log_read_stateless_test.go36
1 files changed, 27 insertions, 9 deletions
diff --git a/weed/util/log_buffer/log_read_stateless_test.go b/weed/util/log_buffer/log_read_stateless_test.go
index 948a929ba..6c9206eb4 100644
--- a/weed/util/log_buffer/log_read_stateless_test.go
+++ b/weed/util/log_buffer/log_read_stateless_test.go
@@ -45,7 +45,9 @@ func TestReadMessagesAtOffset_SingleMessage(t *testing.T) {
Data: []byte("value1"),
Offset: 0,
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
// Read from offset 0
messages, nextOffset, _, endOfPartition, err := lb.ReadMessagesAtOffset(0, 10, 1024)
@@ -82,7 +84,9 @@ func TestReadMessagesAtOffset_MultipleMessages(t *testing.T) {
Data: []byte("value"),
Offset: int64(i),
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
}
// Read from offset 0, max 3 messages
@@ -118,7 +122,9 @@ func TestReadMessagesAtOffset_StartFromMiddle(t *testing.T) {
Data: []byte("value"),
Offset: int64(i),
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
}
// Read from offset 5
@@ -155,7 +161,9 @@ func TestReadMessagesAtOffset_MaxBytesLimit(t *testing.T) {
Data: make([]byte, 100), // 100 bytes
Offset: int64(i),
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
}
// Request with max 250 bytes (should get ~2 messages)
@@ -186,7 +194,9 @@ func TestReadMessagesAtOffset_ConcurrentReads(t *testing.T) {
Data: []byte("value"),
Offset: int64(i),
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
}
// Start 10 concurrent readers at different offsets
@@ -238,7 +248,9 @@ func TestReadMessagesAtOffset_FutureOffset(t *testing.T) {
Data: []byte("value"),
Offset: int64(i),
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
}
// Try to read from offset 10 (future)
@@ -269,7 +281,9 @@ func TestWaitForDataWithTimeout_DataAvailable(t *testing.T) {
Data: []byte("value"),
Offset: 0,
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
// Wait for data at offset 0 (should return immediately)
dataAvailable := lb.WaitForDataWithTimeout(0, 100)
@@ -321,7 +335,9 @@ func TestWaitForDataWithTimeout_DataArrives(t *testing.T) {
Data: []byte("value"),
Offset: 0,
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
// Wait for result
<-done
@@ -349,7 +365,9 @@ func TestGetHighWaterMark(t *testing.T) {
Data: []byte("value"),
Offset: int64(i),
}
- lb.AddLogEntryToBuffer(entry)
+ if err := lb.AddLogEntryToBuffer(entry); err != nil {
+ t.Fatalf("Failed to add log entry to buffer: %v", err)
+ }
}
// HWM should be 5 (next offset to write, not last written offset)