diff options
| author | chrislu <chris.lu@gmail.com> | 2025-10-24 00:48:24 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-10-24 00:48:24 -0700 |
| commit | 64a4ce93580258b8d5537416dfb5d9a1a8f14ee2 (patch) | |
| tree | b65e88a6193457c63d50f146346b326b6f7c74b7 /weed/util/log_buffer/log_buffer_queryability_test.go | |
| parent | 832df5265f6bcf0c71ee2256cfe9acb8be99ea86 (diff) | |
| download | seaweedfs-64a4ce93580258b8d5537416dfb5d9a1a8f14ee2.tar.xz seaweedfs-64a4ce93580258b8d5537416dfb5d9a1a8f14ee2.zip | |
fix reading
Gap detection and skipping to earliest memory time
Time-based reads that include events at boundary times for first reads (offset ≤ 0)
Aggregated subscriber wake-up via ListenersWaits signaling
Diffstat (limited to 'weed/util/log_buffer/log_buffer_queryability_test.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer_queryability_test.go | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_buffer_queryability_test.go b/weed/util/log_buffer/log_buffer_queryability_test.go index 6e372d2b3..16dd0f9b0 100644 --- a/weed/util/log_buffer/log_buffer_queryability_test.go +++ b/weed/util/log_buffer/log_buffer_queryability_test.go @@ -236,3 +236,58 @@ func TestSchemaRegistryScenario(t *testing.T) { t.Logf("Schema registry scenario test passed - schema value preserved: %d bytes", len(retrievedEntry.Data)) } + +// TestTimeBasedFirstReadBeforeEarliest ensures starting slightly before earliest memory +// does not force a disk resume and returns in-memory data (regression test) +func TestTimeBasedFirstReadBeforeEarliest(t *testing.T) { + flushed := false + logBuffer := NewLogBuffer("local", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // keep in memory; we just want earliest time populated + _ = buf + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + // disk should not be consulted in this regression path + return startPosition, false, nil + }, + func() {}) + + // Seed one entry so earliestTime is set + baseTs := time.Now().Add(-time.Second) + entry := &filer_pb.LogEntry{TsNs: baseTs.UnixNano(), Data: []byte("x"), Key: []byte("k"), Offset: 0} + logBuffer.AddLogEntryToBuffer(entry) + _ = flushed + + // Start read 1ns before earliest memory, with offset sentinel (-2) + startPos := NewMessagePosition(baseTs.Add(-time.Nanosecond).UnixNano(), -2) + buf, _, err := logBuffer.ReadFromBuffer(startPos) + if err != nil { + t.Fatalf("ReadFromBuffer returned err: %v", err) + } + if buf == nil { + t.Fatalf("Expected in-memory data, got nil buffer") + } +} + +// TestEarliestTimeExactRead ensures starting exactly at earliest time returns first entry (no skip) +func TestEarliestTimeExactRead(t *testing.T) { + logBuffer := NewLogBuffer("local", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + return startPosition, false, nil + }, + func() {}) + + ts := time.Now() + entry := &filer_pb.LogEntry{TsNs: ts.UnixNano(), Data: []byte("a"), Key: []byte("k"), Offset: 0} + logBuffer.AddLogEntryToBuffer(entry) + + startPos := NewMessagePosition(ts.UnixNano(), -2) + buf, _, err := logBuffer.ReadFromBuffer(startPos) + if err != nil { + t.Fatalf("ReadFromBuffer err: %v", err) + } + if buf == nil || buf.Len() == 0 { + t.Fatalf("Expected data at earliest time, got nil/empty") + } +} |
