diff options
Diffstat (limited to 'weed/mount/ml/prefetch_test.go')
| -rw-r--r-- | weed/mount/ml/prefetch_test.go | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/weed/mount/ml/prefetch_test.go b/weed/mount/ml/prefetch_test.go new file mode 100644 index 000000000..e72ee700c --- /dev/null +++ b/weed/mount/ml/prefetch_test.go @@ -0,0 +1,333 @@ +package ml + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestPrefetchManager_Basic(t *testing.T) { + pm := NewPrefetchManager(2, 10, 5*time.Second) + defer pm.Shutdown() + + // Test basic prefetch request + ctx := context.Background() + var callbackData []byte + var callbackErr error + var callbackCalled int32 + + callback := func(data []byte, err error) { + atomic.StoreInt32(&callbackCalled, 1) + callbackData = data + callbackErr = err + } + + success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + if !success { + t.Error("Expected prefetch request to succeed") + } + + // Wait for callback to be called + time.Sleep(100 * time.Millisecond) + + if atomic.LoadInt32(&callbackCalled) != 1 { + t.Error("Expected callback to be called") + } + + if callbackErr != nil { + t.Errorf("Expected no error, got: %v", callbackErr) + } + + if len(callbackData) != 1024 { + t.Errorf("Expected data length 1024, got: %d", len(callbackData)) + } +} + +func TestPrefetchManager_DuplicateRequests(t *testing.T) { + pm := NewPrefetchManager(2, 10, 5*time.Second) + defer pm.Shutdown() + + ctx := context.Background() + var callbackCount int32 + + callback := func(data []byte, err error) { + atomic.AddInt32(&callbackCount, 1) + } + + // Send the same request multiple times + success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + success2 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + success3 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + + if !success1 { + t.Error("Expected first prefetch request to succeed") + } + + if success2 || success3 { + t.Error("Expected duplicate requests to be rejected") + } + + // Wait for processing + time.Sleep(100 * time.Millisecond) + + // Should have only one callback + if atomic.LoadInt32(&callbackCount) != 1 { + t.Errorf("Expected 1 callback, got: %d", atomic.LoadInt32(&callbackCount)) + } + + // Check metrics + metrics := pm.GetMetrics() + if metrics.TotalRequests != 3 { + t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests) + } + + if metrics.DuplicateReqs != 2 { + t.Errorf("Expected 2 duplicate requests, got: %d", metrics.DuplicateReqs) + } +} + +func TestPrefetchManager_WorkerPool(t *testing.T) { + pm := NewPrefetchManager(3, 20, 5*time.Second) + defer pm.Shutdown() + + ctx := context.Background() + var completedCount int32 + + callback := func(data []byte, err error) { + atomic.AddInt32(&completedCount, 1) + } + + // Send multiple requests + requestCount := 10 + for i := 0; i < requestCount; i++ { + fileId := "file" + string(rune('0'+i)) + success := pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback) + if !success { + t.Errorf("Expected prefetch request %d to succeed", i) + } + } + + // Wait for all to complete + time.Sleep(200 * time.Millisecond) + + completed := atomic.LoadInt32(&completedCount) + if completed != int32(requestCount) { + t.Errorf("Expected %d completed requests, got: %d", requestCount, completed) + } + + metrics := pm.GetMetrics() + if metrics.SuccessfulFetch != int64(requestCount) { + t.Errorf("Expected %d successful fetches, got: %d", requestCount, metrics.SuccessfulFetch) + } +} + +func TestPrefetchManager_Cancel(t *testing.T) { + pm := NewPrefetchManager(1, 5, 5*time.Second) // Single worker to ensure ordering + defer pm.Shutdown() + + ctx := context.Background() + var callbackCalled int32 + + callback := func(data []byte, err error) { + atomic.StoreInt32(&callbackCalled, 1) + } + + // Queue a request + success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + if !success { + t.Error("Expected prefetch request to succeed") + } + + // Cancel it immediately + cancelled := pm.Cancel("file1", 0) + if !cancelled { + t.Error("Expected cancel to succeed") + } + + // Wait a bit + time.Sleep(50 * time.Millisecond) + + // Callback might still be called since cancellation is asynchronous + // Main thing is that the job was marked as cancelled +} + +func TestPrefetchManager_QueueFull(t *testing.T) { + pm := NewPrefetchManager(1, 2, 5*time.Second) // Small queue + defer pm.Shutdown() + + ctx := context.Background() + callback := func(data []byte, err error) {} + + // Fill the queue + success1 := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + success2 := pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback) + success3 := pm.Prefetch(ctx, "file3", 0, 0, 1024, 1, callback) // This should fail + + if !success1 || !success2 { + t.Error("Expected first two requests to succeed") + } + + if success3 { + t.Error("Expected third request to fail due to full queue") + } +} + +func TestPrefetchManager_Timeout(t *testing.T) { + pm := NewPrefetchManager(1, 5, 50*time.Millisecond) // Very short timeout + defer pm.Shutdown() + + ctx := context.Background() + var timeoutCount int32 + + callback := func(data []byte, err error) { + if err == context.DeadlineExceeded { + atomic.AddInt32(&timeoutCount, 1) + } + } + + // This implementation doesn't actually timeout since fetchChunk is fast + // But the structure is there for when we integrate with real chunk fetching + success := pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + if !success { + t.Error("Expected prefetch request to succeed") + } + + time.Sleep(200 * time.Millisecond) +} + +func TestPrefetchManager_ConcurrentAccess(t *testing.T) { + pm := NewPrefetchManager(4, 50, 5*time.Second) + defer pm.Shutdown() + + ctx := context.Background() + var completedCount int32 + + callback := func(data []byte, err error) { + atomic.AddInt32(&completedCount, 1) + } + + // Test concurrent access from multiple goroutines + var wg sync.WaitGroup + goroutineCount := 10 + requestsPerGoroutine := 5 + + for i := 0; i < goroutineCount; i++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + + for j := 0; j < requestsPerGoroutine; j++ { + fileId := "file" + string(rune('0'+goroutineID)) + "_" + string(rune('0'+j)) + pm.Prefetch(ctx, fileId, 0, 0, 1024, 1, callback) + } + }(i) + } + + wg.Wait() + + // Wait for all requests to complete + time.Sleep(500 * time.Millisecond) + + expectedTotal := goroutineCount * requestsPerGoroutine + completed := atomic.LoadInt32(&completedCount) + + if completed != int32(expectedTotal) { + t.Errorf("Expected %d completed requests, got: %d", expectedTotal, completed) + } +} + +func TestPrefetchManager_Metrics(t *testing.T) { + pm := NewPrefetchManager(2, 10, 5*time.Second) + defer pm.Shutdown() + + ctx := context.Background() + callback := func(data []byte, err error) {} + + // Make some requests + pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + pm.Prefetch(ctx, "file2", 0, 0, 1024, 1, callback) + pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) // Duplicate + + time.Sleep(100 * time.Millisecond) + + metrics := pm.GetMetrics() + + if metrics.TotalRequests != 3 { + t.Errorf("Expected 3 total requests, got: %d", metrics.TotalRequests) + } + + if metrics.DuplicateReqs != 1 { + t.Errorf("Expected 1 duplicate request, got: %d", metrics.DuplicateReqs) + } + + if metrics.Workers != 2 { + t.Errorf("Expected 2 workers, got: %d", metrics.Workers) + } + + // Should have some successful fetches + if metrics.SuccessfulFetch == 0 { + t.Error("Expected some successful fetches") + } +} + +func TestPrefetchManager_Shutdown(t *testing.T) { + pm := NewPrefetchManager(2, 10, 5*time.Second) + + ctx := context.Background() + callback := func(data []byte, err error) {} + + // Make a request + pm.Prefetch(ctx, "file1", 0, 0, 1024, 1, callback) + + // Shutdown should complete without hanging + done := make(chan struct{}) + go func() { + pm.Shutdown() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(5 * time.Second): + t.Error("Shutdown took too long") + } +} + +// Benchmark tests + +func BenchmarkPrefetchManager_SingleWorker(b *testing.B) { + pm := NewPrefetchManager(1, 1000, 30*time.Second) + defer pm.Shutdown() + + ctx := context.Background() + callback := func(data []byte, err error) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fileId := "file" + string(rune(i%100)) // Reuse file IDs to test deduplication + pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback) + } +} + +func BenchmarkPrefetchManager_MultipleWorkers(b *testing.B) { + pm := NewPrefetchManager(8, 1000, 30*time.Second) + defer pm.Shutdown() + + ctx := context.Background() + callback := func(data []byte, err error) {} + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + fileId := "file" + string(rune(i%1000)) + pm.Prefetch(ctx, fileId, uint32(i), 0, 1024, 1, callback) + i++ + } + }) +} |
