aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/ml/prefetch_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/ml/prefetch_test.go')
-rw-r--r--weed/mount/ml/prefetch_test.go333
1 files changed, 333 insertions, 0 deletions
diff --git a/weed/mount/ml/prefetch_test.go b/weed/mount/ml/prefetch_test.go
new file mode 100644
index 000000000..e72ee700c
--- /dev/null
+++ b/weed/mount/ml/prefetch_test.go
@@ -0,0 +1,333 @@
+package ml
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestPrefetchManager_Basic(t *testing.T) {
+ pm := NewPrefetchManager(2, 10, 5*time.Second)
+ defer pm.Shutdown()
+
+ // Test basic prefetch request
+ ctx := context.Background()
+ var callbackData []byte
+ var callbackErr error
+ var callbackCalled int32
+
+ callback := func(data []byte, err error) {
+ atomic.StoreInt32(&callbackCalled, 1)
+ callbackData = data
+ callbackErr = err
+ }
+
+ success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ if !success {
+ t.Error("Expected prefetch request to succeed")
+ }
+
+ // Wait for callback to be called
+ time.Sleep(100 * time.Millisecond)
+
+ if atomic.LoadInt32(&callbackCalled) != 1 {
+ t.Error("Expected callback to be called")
+ }
+
+ if callbackErr != nil {
+ t.Errorf("Expected no error, got: %v", callbackErr)
+ }
+
+ if len(callbackData) != 1024 {
+ t.Errorf("Expected data length 1024, got: %d", len(callbackData))
+ }
+}
+
+func TestPrefetchManager_DuplicateRequests(t *testing.T) {
+ pm := NewPrefetchManager(2, 10, 5*time.Second)
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ var callbackCount int32
+
+ callback := func(data []byte, err error) {
+ atomic.AddInt32(&callbackCount, 1)
+ }
+
+ // Send the same request multiple times
+ success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ success2 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ success3 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+
+ if !success1 {
+ t.Error("Expected first prefetch request to succeed")
+ }
+
+ if success2 || success3 {
+ t.Error("Expected duplicate requests to be rejected")
+ }
+
+ // Wait for processing
+ time.Sleep(100 * time.Millisecond)
+
+ // Should have only one callback
+ if atomic.LoadInt32(&callbackCount) != 1 {
+ t.Errorf("Expected 1 callback, got: %d", atomic.LoadInt32(&callbackCount))
+ }
+
+ // Check metrics
+ metrics := pm.GetMetrics()
+ if metrics.TotalRequests != 3 {
+ t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests)
+ }
+
+ if metrics.DuplicateReqs != 2 {
+ t.Errorf("Expected 2 duplicate requests, got: %d", metrics.DuplicateReqs)
+ }
+}
+
+func TestPrefetchManager_WorkerPool(t *testing.T) {
+ pm := NewPrefetchManager(3, 20, 5*time.Second)
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ var completedCount int32
+
+ callback := func(data []byte, err error) {
+ atomic.AddInt32(&completedCount, 1)
+ }
+
+ // Send multiple requests
+ requestCount := 10
+ for i := 0; i < requestCount; i++ {
+ fileId := "file" + string(rune('0'+i))
+ success := pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback)
+ if !success {
+ t.Errorf("Expected prefetch request %d to succeed", i)
+ }
+ }
+
+ // Wait for all to complete
+ time.Sleep(200 * time.Millisecond)
+
+ completed := atomic.LoadInt32(&completedCount)
+ if completed != int32(requestCount) {
+ t.Errorf("Expected %d completed requests, got: %d", requestCount, completed)
+ }
+
+ metrics := pm.GetMetrics()
+ if metrics.SuccessfulFetch != int64(requestCount) {
+ t.Errorf("Expected %d successful fetches, got: %d", requestCount, metrics.SuccessfulFetch)
+ }
+}
+
+func TestPrefetchManager_Cancel(t *testing.T) {
+ pm := NewPrefetchManager(1, 5, 5*time.Second) // Single worker to ensure ordering
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ var callbackCalled int32
+
+ callback := func(data []byte, err error) {
+ atomic.StoreInt32(&callbackCalled, 1)
+ }
+
+ // Queue a request
+ success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ if !success {
+ t.Error("Expected prefetch request to succeed")
+ }
+
+ // Cancel it immediately
+ cancelled := pm.Cancel("file1", 0)
+ if !cancelled {
+ t.Error("Expected cancel to succeed")
+ }
+
+ // Wait a bit
+ time.Sleep(50 * time.Millisecond)
+
+ // Callback might still be called since cancellation is asynchronous
+ // Main thing is that the job was marked as cancelled
+}
+
+func TestPrefetchManager_QueueFull(t *testing.T) {
+ pm := NewPrefetchManager(1, 2, 5*time.Second) // Small queue
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ callback := func(data []byte, err error) {}
+
+ // Fill the queue
+ success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ success2 := pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback)
+ success3 := pm.Prefetch(ctx, "file3", 0, 0, 1024, 1, callback) // This should fail
+
+ if !success1 || !success2 {
+ t.Error("Expected first two requests to succeed")
+ }
+
+ if success3 {
+ t.Error("Expected third request to fail due to full queue")
+ }
+}
+
+func TestPrefetchManager_Timeout(t *testing.T) {
+ pm := NewPrefetchManager(1, 5, 50*time.Millisecond) // Very short timeout
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ var timeoutCount int32
+
+ callback := func(data []byte, err error) {
+ if err == context.DeadlineExceeded {
+ atomic.AddInt32(&timeoutCount, 1)
+ }
+ }
+
+ // This implementation doesn't actually timeout since fetchChunk is fast
+ // But the structure is there for when we integrate with real chunk fetching
+ success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ if !success {
+ t.Error("Expected prefetch request to succeed")
+ }
+
+ time.Sleep(200 * time.Millisecond)
+}
+
+func TestPrefetchManager_ConcurrentAccess(t *testing.T) {
+ pm := NewPrefetchManager(4, 50, 5*time.Second)
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ var completedCount int32
+
+ callback := func(data []byte, err error) {
+ atomic.AddInt32(&completedCount, 1)
+ }
+
+ // Test concurrent access from multiple goroutines
+ var wg sync.WaitGroup
+ goroutineCount := 10
+ requestsPerGoroutine := 5
+
+ for i := 0; i < goroutineCount; i++ {
+ wg.Add(1)
+ go func(goroutineID int) {
+ defer wg.Done()
+
+ for j := 0; j < requestsPerGoroutine; j++ {
+ fileId := "file" + string(rune('0'+goroutineID)) + "_" + string(rune('0'+j))
+ pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+
+ // Wait for all requests to complete
+ time.Sleep(500 * time.Millisecond)
+
+ expectedTotal := goroutineCount * requestsPerGoroutine
+ completed := atomic.LoadInt32(&completedCount)
+
+ if completed != int32(expectedTotal) {
+ t.Errorf("Expected %d completed requests, got: %d", expectedTotal, completed)
+ }
+}
+
+func TestPrefetchManager_Metrics(t *testing.T) {
+ pm := NewPrefetchManager(2, 10, 5*time.Second)
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ callback := func(data []byte, err error) {}
+
+ // Make some requests
+ pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+ pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback)
+ pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) // Duplicate
+
+ time.Sleep(100 * time.Millisecond)
+
+ metrics := pm.GetMetrics()
+
+ if metrics.TotalRequests != 3 {
+ t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests)
+ }
+
+ if metrics.DuplicateReqs != 1 {
+ t.Errorf("Expected 1 duplicate request, got: %d", metrics.DuplicateReqs)
+ }
+
+ if metrics.Workers != 2 {
+ t.Errorf("Expected 2 workers, got: %d", metrics.Workers)
+ }
+
+ // Should have some successful fetches
+ if metrics.SuccessfulFetch == 0 {
+ t.Error("Expected some successful fetches")
+ }
+}
+
+func TestPrefetchManager_Shutdown(t *testing.T) {
+ pm := NewPrefetchManager(2, 10, 5*time.Second)
+
+ ctx := context.Background()
+ callback := func(data []byte, err error) {}
+
+ // Make a request
+ pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback)
+
+ // Shutdown should complete without hanging
+ done := make(chan struct{})
+ go func() {
+ pm.Shutdown()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Success
+ case <-time.After(5 * time.Second):
+ t.Error("Shutdown took too long")
+ }
+}
+
+// Benchmark tests
+
+func BenchmarkPrefetchManager_SingleWorker(b *testing.B) {
+ pm := NewPrefetchManager(1, 1000, 30*time.Second)
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ callback := func(data []byte, err error) {}
+
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ fileId := "file" + string(rune(i%100)) // Reuse file IDs to test deduplication
+ pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback)
+ }
+}
+
+func BenchmarkPrefetchManager_MultipleWorkers(b *testing.B) {
+ pm := NewPrefetchManager(8, 1000, 30*time.Second)
+ defer pm.Shutdown()
+
+ ctx := context.Background()
+ callback := func(data []byte, err error) {}
+
+ b.ResetTimer()
+
+ b.RunParallel(func(pb *testing.PB) {
+ i := 0
+ for pb.Next() {
+ fileId := "file" + string(rune(i%1000))
+ pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback)
+ i++
+ }
+ })
+}