aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-08-30 15:32:00 -0700
committerchrislu <chris.lu@gmail.com>2025-08-30 15:32:00 -0700
commit63b94321ec015ca6565364fc3b97f9a849f7e0ee (patch)
treec8f7a9b6bcf8ddd05aa7aa56cd95dc044b7d17a5
parente7f5fff9891f73d8231f9fdca2c7b455552a5a9e (diff)
downloadseaweedfs-63b94321ec015ca6565364fc3b97f9a849f7e0ee.tar.xz
seaweedfs-63b94321ec015ca6565364fc3b97f9a849f7e0ee.zip
fmt
-rw-r--r--weed/mount/ml/cache_policy.go124
-rw-r--r--weed/mount/ml/cache_policy_test.go254
-rw-r--r--weed/mount/ml/fuse_integration.go116
-rw-r--r--weed/mount/ml/open_file_cache.go256
-rw-r--r--weed/mount/ml/open_file_cache_test.go184
-rw-r--r--weed/mount/ml_integration.go38
6 files changed, 485 insertions, 487 deletions
diff --git a/weed/mount/ml/cache_policy.go b/weed/mount/ml/cache_policy.go
index 256b36dc9..44650a44d 100644
--- a/weed/mount/ml/cache_policy.go
+++ b/weed/mount/ml/cache_policy.go
@@ -17,36 +17,36 @@ type CacheEntry struct {
Pattern AccessPattern // Detected access pattern
FileType MLFileType // Type of ML file
IsHot bool // Whether this is a hot chunk
-
+
// ML-specific metadata
- IsTrainingData bool // Whether this is training data
- IsModel bool // Whether this is a model file
- PredictedReuse float64 // Predicted reuse probability (0.0-1.0)
- EpochRelevance float64 // Relevance for current training epoch
+ IsTrainingData bool // Whether this is training data
+ IsModel bool // Whether this is a model file
+ PredictedReuse float64 // Predicted reuse probability (0.0-1.0)
+ EpochRelevance float64 // Relevance for current training epoch
}
// MLCachePolicy implements ML-aware cache eviction policy
type MLCachePolicy struct {
// Weights for different factors (sum should be 1.0)
accessFrequencyWeight float64 // Weight for access frequency
- recencyWeight float64 // Weight for access recency
+ recencyWeight float64 // Weight for access recency
sizeWeight float64 // Weight for item size
mlWeight float64 // Weight for ML-specific factors
-
+
// ML-specific parameters
- trainingDataBoost float64 // Boost factor for training data
- modelFileBoost float64 // Boost factor for model files
- sequentialBoost float64 // Boost factor for sequential access
- epochRelevanceBoost float64 // Boost factor for epoch-relevant data
-
+ trainingDataBoost float64 // Boost factor for training data
+ modelFileBoost float64 // Boost factor for model files
+ sequentialBoost float64 // Boost factor for sequential access
+ epochRelevanceBoost float64 // Boost factor for epoch-relevant data
+
// Time-based parameters
- hotThreshold time.Duration // Threshold for considering item "hot"
- coldThreshold time.Duration // Threshold for considering item "cold"
-
+ hotThreshold time.Duration // Threshold for considering item "hot"
+ coldThreshold time.Duration // Threshold for considering item "cold"
+
// Size-based parameters
- largeFileThreshold uint64 // Threshold for large files
- smallFilePreference float64 // Preference for keeping small files
-
+ largeFileThreshold uint64 // Threshold for large files
+ smallFilePreference float64 // Preference for keeping small files
+
// Statistics
totalEvictions int64
mlFileEvictions int64
@@ -60,19 +60,19 @@ func NewMLCachePolicy() *MLCachePolicy {
// Balanced weights
accessFrequencyWeight: 0.3,
recencyWeight: 0.3,
- sizeWeight: 0.2,
- mlWeight: 0.2,
-
+ sizeWeight: 0.2,
+ mlWeight: 0.2,
+
// ML-specific boosts
trainingDataBoost: 1.5, // 50% boost for training data
modelFileBoost: 2.0, // 100% boost for model files
sequentialBoost: 1.3, // 30% boost for sequential access
epochRelevanceBoost: 1.4, // 40% boost for epoch-relevant data
-
+
// Time thresholds
hotThreshold: 1 * time.Minute,
coldThreshold: 10 * time.Minute,
-
+
// Size parameters
largeFileThreshold: 10 * 1024 * 1024, // 10MB
smallFilePreference: 1.2, // 20% preference for small files
@@ -84,32 +84,32 @@ func NewMLCachePolicy() *MLCachePolicy {
func (policy *MLCachePolicy) CalculateEvictionScore(entry *CacheEntry) float64 {
now := time.Now()
timeSinceAccess := now.Sub(entry.LastAccess)
-
+
// Base factors
accessFrequencyScore := policy.calculateAccessFrequencyScore(entry)
recencyScore := policy.calculateRecencyScore(timeSinceAccess)
sizeScore := policy.calculateSizeScore(entry.Size)
mlScore := policy.calculateMLScore(entry)
-
+
// Weighted combination
totalScore := policy.accessFrequencyWeight*accessFrequencyScore +
policy.recencyWeight*recencyScore +
policy.sizeWeight*sizeScore +
policy.mlWeight*mlScore
-
- glog.V(4).Infof("Eviction score for inode=%d: total=%.3f (freq=%.3f, recency=%.3f, size=%.3f, ml=%.3f)",
+
+ glog.V(4).Infof("Eviction score for inode=%d: total=%.3f (freq=%.3f, recency=%.3f, size=%.3f, ml=%.3f)",
entry.Inode, totalScore, accessFrequencyScore, recencyScore, sizeScore, mlScore)
-
+
return totalScore
}
// ShouldEvict determines if a cache entry should be evicted
func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool {
score := policy.CalculateEvictionScore(entry)
-
+
// Different thresholds based on ML file type
threshold := 0.3 // Default threshold
-
+
switch entry.FileType {
case MLFileModel:
threshold = 0.1 // Very low threshold - keep models cached longer
@@ -126,9 +126,9 @@ func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool {
default:
threshold = 0.3 // Default for unknown files
}
-
+
shouldEvict := score < threshold
-
+
if shouldEvict {
policy.totalEvictions++
if entry.IsTrainingData {
@@ -140,11 +140,11 @@ func (policy *MLCachePolicy) ShouldEvict(entry *CacheEntry) bool {
if entry.FileType != MLFileUnknown {
policy.mlFileEvictions++
}
-
- glog.V(4).Infof("Evicting: inode=%d, score=%.3f < threshold=%.3f, type=%v",
+
+ glog.V(4).Infof("Evicting: inode=%d, score=%.3f < threshold=%.3f, type=%v",
entry.Inode, score, threshold, entry.FileType)
}
-
+
return shouldEvict
}
@@ -153,10 +153,10 @@ func (policy *MLCachePolicy) calculateAccessFrequencyScore(entry *CacheEntry) fl
if entry.AccessCount == 0 {
return 0.0
}
-
+
// Logarithmic scaling for access count
base := math.Log(float64(entry.AccessCount) + 1)
-
+
// Apply ML-specific boosts
boost := 1.0
if entry.IsTrainingData {
@@ -171,7 +171,7 @@ func (policy *MLCachePolicy) calculateAccessFrequencyScore(entry *CacheEntry) fl
if entry.EpochRelevance > 0.5 {
boost *= policy.epochRelevanceBoost
}
-
+
return base * boost
}
@@ -180,11 +180,11 @@ func (policy *MLCachePolicy) calculateRecencyScore(timeSinceAccess time.Duration
if timeSinceAccess <= policy.hotThreshold {
return 1.0 // Very recent access
}
-
+
if timeSinceAccess >= policy.coldThreshold {
return 0.1 // Very old access
}
-
+
// Linear decay between hot and cold thresholds
ratio := float64(timeSinceAccess-policy.hotThreshold) / float64(policy.coldThreshold-policy.hotThreshold)
return 1.0 - ratio*0.9 // Decay from 1.0 to 0.1
@@ -196,7 +196,7 @@ func (policy *MLCachePolicy) calculateSizeScore(size uint64) float64 {
// Prefer keeping smaller files (higher score)
return policy.smallFilePreference
}
-
+
// Larger files get lower score (more likely to be evicted)
// But not too low since they might be important model files
ratio := float64(size) / float64(policy.largeFileThreshold)
@@ -206,7 +206,7 @@ func (policy *MLCachePolicy) calculateSizeScore(size uint64) float64 {
// calculateMLScore calculates ML-specific factors
func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 {
score := 0.5 // Base score for non-ML files
-
+
// File type bonuses
switch entry.FileType {
case MLFileModel:
@@ -222,7 +222,7 @@ func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 {
default:
score = 0.5 // Default for unknown files
}
-
+
// Access pattern bonuses
switch entry.Pattern {
case SequentialAccess:
@@ -234,22 +234,22 @@ func (policy *MLCachePolicy) calculateMLScore(entry *CacheEntry) float64 {
case BatchAccess:
score *= 1.1 // Small boost for batch access
}
-
+
// Predicted reuse bonus
if entry.PredictedReuse > 0.7 {
score *= 1.2 // Boost for high predicted reuse
}
-
+
// Epoch relevance bonus
if entry.EpochRelevance > 0.5 {
score *= (1.0 + entry.EpochRelevance*0.3) // Up to 30% boost for epoch relevance
}
-
+
// Hot chunk bonus
if entry.IsHot {
score *= 1.1
}
-
+
return score
}
@@ -260,27 +260,27 @@ func (policy *MLCachePolicy) GetEvictionMetrics() MLCachePolicyMetrics {
MLFileEvictions: policy.mlFileEvictions,
TrainingDataEvictions: policy.trainingDataEvictions,
ModelFileEvictions: policy.modelFileEvictions,
-
+
// Configuration
AccessFrequencyWeight: policy.accessFrequencyWeight,
RecencyWeight: policy.recencyWeight,
- SizeWeight: policy.sizeWeight,
- MLWeight: policy.mlWeight,
+ SizeWeight: policy.sizeWeight,
+ MLWeight: policy.mlWeight,
}
}
// MLCachePolicyMetrics holds metrics for the ML cache policy
type MLCachePolicyMetrics struct {
- TotalEvictions int64 `json:"total_evictions"`
- MLFileEvictions int64 `json:"ml_file_evictions"`
- TrainingDataEvictions int64 `json:"training_data_evictions"`
- ModelFileEvictions int64 `json:"model_file_evictions"`
-
+ TotalEvictions int64 `json:"total_evictions"`
+ MLFileEvictions int64 `json:"ml_file_evictions"`
+ TrainingDataEvictions int64 `json:"training_data_evictions"`
+ ModelFileEvictions int64 `json:"model_file_evictions"`
+
// Configuration weights
AccessFrequencyWeight float64 `json:"access_frequency_weight"`
RecencyWeight float64 `json:"recency_weight"`
- SizeWeight float64 `json:"size_weight"`
- MLWeight float64 `json:"ml_weight"`
+ SizeWeight float64 `json:"size_weight"`
+ MLWeight float64 `json:"ml_weight"`
}
// SetWeights updates the eviction policy weights
@@ -290,14 +290,14 @@ func (policy *MLCachePolicy) SetWeights(frequency, recency, size, ml float64) {
glog.Warningf("Invalid weights provided, using defaults")
return
}
-
+
// Normalize weights to sum to 1.0
policy.accessFrequencyWeight = frequency / total
policy.recencyWeight = recency / total
policy.sizeWeight = size / total
policy.mlWeight = ml / total
-
- glog.V(2).Infof("Updated eviction policy weights: freq=%.2f, recency=%.2f, size=%.2f, ml=%.2f",
+
+ glog.V(2).Infof("Updated eviction policy weights: freq=%.2f, recency=%.2f, size=%.2f, ml=%.2f",
policy.accessFrequencyWeight, policy.recencyWeight, policy.sizeWeight, policy.mlWeight)
}
@@ -307,7 +307,7 @@ func (policy *MLCachePolicy) SetMLBoosts(trainingData, model, sequential, epochR
policy.modelFileBoost = model
policy.sequentialBoost = sequential
policy.epochRelevanceBoost = epochRelevance
-
- glog.V(2).Infof("Updated ML boost factors: training=%.2f, model=%.2f, sequential=%.2f, epoch=%.2f",
+
+ glog.V(2).Infof("Updated ML boost factors: training=%.2f, model=%.2f, sequential=%.2f, epoch=%.2f",
trainingData, model, sequential, epochRelevance)
}
diff --git a/weed/mount/ml/cache_policy_test.go b/weed/mount/ml/cache_policy_test.go
index 29df5b859..00ccff218 100644
--- a/weed/mount/ml/cache_policy_test.go
+++ b/weed/mount/ml/cache_policy_test.go
@@ -7,7 +7,7 @@ import (
func TestMLCachePolicy_Basic(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Test basic eviction score calculation
entry := &CacheEntry{
Inode: 1,
@@ -19,19 +19,19 @@ func TestMLCachePolicy_Basic(t *testing.T) {
FileType: MLFileUnknown,
IsHot: false,
}
-
+
score := policy.CalculateEvictionScore(entry)
if score <= 0 {
t.Error("Eviction score should be positive")
}
-
+
shouldEvict := policy.ShouldEvict(entry)
t.Logf("Basic entry eviction: score=%.3f, shouldEvict=%v", score, shouldEvict)
}
func TestMLCachePolicy_ModelFileBoost(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Create two identical entries, one is a model file
baseEntry := &CacheEntry{
Inode: 1,
@@ -43,7 +43,7 @@ func TestMLCachePolicy_ModelFileBoost(t *testing.T) {
FileType: MLFileUnknown,
IsModel: false,
}
-
+
modelEntry := &CacheEntry{
Inode: 2,
Size: 10 * 1024 * 1024, // 10MB
@@ -54,60 +54,60 @@ func TestMLCachePolicy_ModelFileBoost(t *testing.T) {
FileType: MLFileModel,
IsModel: true,
}
-
+
baseScore := policy.CalculateEvictionScore(baseEntry)
modelScore := policy.CalculateEvictionScore(modelEntry)
-
+
if modelScore <= baseScore {
- t.Errorf("Model file should have higher score than regular file: model=%.3f, base=%.3f",
+ t.Errorf("Model file should have higher score than regular file: model=%.3f, base=%.3f",
modelScore, baseScore)
}
-
+
// Model files should be less likely to be evicted
baseShouldEvict := policy.ShouldEvict(baseEntry)
modelShouldEvict := policy.ShouldEvict(modelEntry)
-
+
if modelShouldEvict && !baseShouldEvict {
t.Error("Model file should not be evicted if regular file is not evicted")
}
-
- t.Logf("Model vs Base eviction: model=%.3f (evict=%v), base=%.3f (evict=%v)",
+
+ t.Logf("Model vs Base eviction: model=%.3f (evict=%v), base=%.3f (evict=%v)",
modelScore, modelShouldEvict, baseScore, baseShouldEvict)
}
func TestMLCachePolicy_TrainingDataBoost(t *testing.T) {
policy := NewMLCachePolicy()
-
+
regularEntry := &CacheEntry{
- Inode: 1,
- Size: 1024,
- LastAccess: time.Now().Add(-2 * time.Minute),
- AccessCount: 10,
- FileType: MLFileUnknown,
+ Inode: 1,
+ Size: 1024,
+ LastAccess: time.Now().Add(-2 * time.Minute),
+ AccessCount: 10,
+ FileType: MLFileUnknown,
IsTrainingData: false,
}
-
+
trainingEntry := &CacheEntry{
- Inode: 2,
- Size: 1024,
- LastAccess: time.Now().Add(-2 * time.Minute),
- AccessCount: 10,
- FileType: MLFileDataset,
+ Inode: 2,
+ Size: 1024,
+ LastAccess: time.Now().Add(-2 * time.Minute),
+ AccessCount: 10,
+ FileType: MLFileDataset,
IsTrainingData: true,
}
-
+
regularScore := policy.CalculateEvictionScore(regularEntry)
trainingScore := policy.CalculateEvictionScore(trainingEntry)
-
+
if trainingScore <= regularScore {
- t.Errorf("Training data should have higher score: training=%.3f, regular=%.3f",
+ t.Errorf("Training data should have higher score: training=%.3f, regular=%.3f",
trainingScore, regularScore)
}
}
func TestMLCachePolicy_AccessPatternBoost(t *testing.T) {
policy := NewMLCachePolicy()
-
+
randomEntry := &CacheEntry{
Inode: 1,
Size: 1024,
@@ -116,7 +116,7 @@ func TestMLCachePolicy_AccessPatternBoost(t *testing.T) {
Pattern: RandomAccess,
FileType: MLFileDataset,
}
-
+
sequentialEntry := &CacheEntry{
Inode: 2,
Size: 1024,
@@ -125,7 +125,7 @@ func TestMLCachePolicy_AccessPatternBoost(t *testing.T) {
Pattern: SequentialAccess,
FileType: MLFileDataset,
}
-
+
modelAccessEntry := &CacheEntry{
Inode: 3,
Size: 1024,
@@ -134,28 +134,28 @@ func TestMLCachePolicy_AccessPatternBoost(t *testing.T) {
Pattern: ModelAccess,
FileType: MLFileModel,
}
-
+
randomScore := policy.CalculateEvictionScore(randomEntry)
sequentialScore := policy.CalculateEvictionScore(sequentialEntry)
modelScore := policy.CalculateEvictionScore(modelAccessEntry)
-
+
if sequentialScore <= randomScore {
- t.Errorf("Sequential access should have higher score than random: seq=%.3f, random=%.3f",
+ t.Errorf("Sequential access should have higher score than random: seq=%.3f, random=%.3f",
sequentialScore, randomScore)
}
-
+
if modelScore <= sequentialScore {
- t.Errorf("Model access should have highest score: model=%.3f, seq=%.3f",
+ t.Errorf("Model access should have highest score: model=%.3f, seq=%.3f",
modelScore, sequentialScore)
}
-
- t.Logf("Pattern comparison: random=%.3f, sequential=%.3f, model=%.3f",
+
+ t.Logf("Pattern comparison: random=%.3f, sequential=%.3f, model=%.3f",
randomScore, sequentialScore, modelScore)
}
func TestMLCachePolicy_SizePreference(t *testing.T) {
policy := NewMLCachePolicy()
-
+
smallEntry := &CacheEntry{
Inode: 1,
Size: 1024, // 1KB
@@ -163,7 +163,7 @@ func TestMLCachePolicy_SizePreference(t *testing.T) {
AccessCount: 3,
FileType: MLFileUnknown,
}
-
+
largeEntry := &CacheEntry{
Inode: 2,
Size: 50 * 1024 * 1024, // 50MB
@@ -171,52 +171,52 @@ func TestMLCachePolicy_SizePreference(t *testing.T) {
AccessCount: 3,
FileType: MLFileUnknown,
}
-
+
smallScore := policy.CalculateEvictionScore(smallEntry)
largeScore := policy.CalculateEvictionScore(largeEntry)
-
+
if smallScore <= largeScore {
- t.Errorf("Small files should have higher score than large files: small=%.3f, large=%.3f",
+ t.Errorf("Small files should have higher score than large files: small=%.3f, large=%.3f",
smallScore, largeScore)
}
}
func TestMLCachePolicy_RecencyDecay(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Create entries with different access times
recentEntry := &CacheEntry{
- Inode: 1,
-
+ Inode: 1,
+
Size: 1024,
LastAccess: time.Now(),
AccessCount: 5,
FileType: MLFileUnknown,
}
-
+
oldEntry := &CacheEntry{
- Inode: 2,
-
+ Inode: 2,
+
Size: 1024,
LastAccess: time.Now().Add(-20 * time.Minute),
AccessCount: 5,
FileType: MLFileUnknown,
}
-
+
recentScore := policy.CalculateEvictionScore(recentEntry)
oldScore := policy.CalculateEvictionScore(oldEntry)
-
+
if recentScore <= oldScore {
- t.Errorf("Recent access should have higher score: recent=%.3f, old=%.3f",
+ t.Errorf("Recent access should have higher score: recent=%.3f, old=%.3f",
recentScore, oldScore)
}
}
func TestMLCachePolicy_EpochRelevance(t *testing.T) {
policy := NewMLCachePolicy()
-
+
lowRelevanceEntry := &CacheEntry{
- Inode: 1,
+ Inode: 1,
Size: 1024,
LastAccess: time.Now(),
@@ -224,9 +224,9 @@ func TestMLCachePolicy_EpochRelevance(t *testing.T) {
FileType: MLFileDataset,
EpochRelevance: 0.2,
}
-
+
highRelevanceEntry := &CacheEntry{
- Inode: 2,
+ Inode: 2,
Size: 1024,
LastAccess: time.Now(),
@@ -234,112 +234,112 @@ func TestMLCachePolicy_EpochRelevance(t *testing.T) {
FileType: MLFileDataset,
EpochRelevance: 0.9,
}
-
+
lowScore := policy.CalculateEvictionScore(lowRelevanceEntry)
highScore := policy.CalculateEvictionScore(highRelevanceEntry)
-
+
if highScore <= lowScore {
- t.Errorf("High epoch relevance should have higher score: high=%.3f, low=%.3f",
+ t.Errorf("High epoch relevance should have higher score: high=%.3f, low=%.3f",
highScore, lowScore)
}
}
func TestMLCachePolicy_DifferentThresholds(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Create entries for different file types with same base score
unknownEntry := &CacheEntry{
- Inode: 1,
-
+ Inode: 1,
+
Size: 1024,
LastAccess: time.Now().Add(-15 * time.Minute), // Old enough to potentially evict
AccessCount: 2,
FileType: MLFileUnknown,
}
-
+
modelEntry := &CacheEntry{
- Inode: 2,
-
+ Inode: 2,
+
Size: 1024,
LastAccess: time.Now().Add(-15 * time.Minute),
AccessCount: 2,
FileType: MLFileModel,
IsModel: true,
}
-
+
datasetEntry := &CacheEntry{
- Inode: 3,
-
+ Inode: 3,
+
Size: 1024,
LastAccess: time.Now().Add(-15 * time.Minute),
AccessCount: 2,
FileType: MLFileDataset,
Pattern: SequentialAccess,
}
-
+
unknownShouldEvict := policy.ShouldEvict(unknownEntry)
modelShouldEvict := policy.ShouldEvict(modelEntry)
datasetShouldEvict := policy.ShouldEvict(datasetEntry)
-
+
// Models should be least likely to be evicted
if modelShouldEvict && (!unknownShouldEvict || !datasetShouldEvict) {
t.Error("Model files should be least likely to be evicted")
}
-
- t.Logf("Eviction by type: unknown=%v, model=%v, dataset=%v",
+
+ t.Logf("Eviction by type: unknown=%v, model=%v, dataset=%v",
unknownShouldEvict, modelShouldEvict, datasetShouldEvict)
}
func TestMLCachePolicy_SetWeights(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Test setting custom weights
policy.SetWeights(0.4, 0.3, 0.1, 0.2)
-
+
if policy.accessFrequencyWeight != 0.4 {
t.Errorf("Expected frequency weight 0.4, got %.2f", policy.accessFrequencyWeight)
}
-
+
if policy.recencyWeight != 0.3 {
t.Errorf("Expected recency weight 0.3, got %.2f", policy.recencyWeight)
}
-
+
if policy.sizeWeight != 0.1 {
t.Errorf("Expected size weight 0.1, got %.2f", policy.sizeWeight)
}
-
+
if policy.mlWeight != 0.2 {
t.Errorf("Expected ML weight 0.2, got %.2f", policy.mlWeight)
}
-
+
// Test weight normalization
policy.SetWeights(2.0, 2.0, 1.0, 1.0) // Total = 6.0
-
+
expectedFreq := 2.0 / 6.0
- if abs(policy.accessFrequencyWeight - expectedFreq) > 0.001 {
- t.Errorf("Expected normalized frequency weight %.3f, got %.3f",
+ if abs(policy.accessFrequencyWeight-expectedFreq) > 0.001 {
+ t.Errorf("Expected normalized frequency weight %.3f, got %.3f",
expectedFreq, policy.accessFrequencyWeight)
}
}
func TestMLCachePolicy_SetMLBoosts(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Test setting custom boost factors
policy.SetMLBoosts(2.0, 3.0, 1.5, 1.8)
-
+
if policy.trainingDataBoost != 2.0 {
t.Errorf("Expected training data boost 2.0, got %.2f", policy.trainingDataBoost)
}
-
+
if policy.modelFileBoost != 3.0 {
t.Errorf("Expected model file boost 3.0, got %.2f", policy.modelFileBoost)
}
-
+
if policy.sequentialBoost != 1.5 {
t.Errorf("Expected sequential boost 1.5, got %.2f", policy.sequentialBoost)
}
-
+
if policy.epochRelevanceBoost != 1.8 {
t.Errorf("Expected epoch relevance boost 1.8, got %.2f", policy.epochRelevanceBoost)
}
@@ -347,30 +347,30 @@ func TestMLCachePolicy_SetMLBoosts(t *testing.T) {
func TestMLCachePolicy_Metrics(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Simulate some evictions
entries := []*CacheEntry{
{FileType: MLFileModel, IsModel: true},
{FileType: MLFileDataset, IsTrainingData: true},
{FileType: MLFileUnknown},
}
-
+
for _, entry := range entries {
entry.LastAccess = time.Now().Add(-30 * time.Minute) // Old enough to evict
entry.AccessCount = 1
entry.Size = 1024
-
+
if policy.ShouldEvict(entry) {
// Eviction counters are updated in ShouldEvict
}
}
-
+
metrics := policy.GetEvictionMetrics()
-
+
if metrics.TotalEvictions == 0 {
t.Error("Should have some total evictions")
}
-
+
// Verify weight configuration in metrics
if metrics.AccessFrequencyWeight != policy.accessFrequencyWeight {
t.Error("Metrics should reflect current weight configuration")
@@ -379,30 +379,30 @@ func TestMLCachePolicy_Metrics(t *testing.T) {
func TestMLCachePolicy_HotChunkPreference(t *testing.T) {
policy := NewMLCachePolicy()
-
+
coldEntry := &CacheEntry{
- Inode: 1,
-
+ Inode: 1,
+
Size: 1024,
LastAccess: time.Now(),
AccessCount: 5,
IsHot: false,
FileType: MLFileDataset,
}
-
+
hotEntry := &CacheEntry{
- Inode: 2,
-
+ Inode: 2,
+
Size: 1024,
LastAccess: time.Now(),
AccessCount: 5,
IsHot: true,
FileType: MLFileDataset,
}
-
+
coldScore := policy.CalculateEvictionScore(coldEntry)
hotScore := policy.CalculateEvictionScore(hotEntry)
-
+
if hotScore <= coldScore {
t.Errorf("Hot chunk should have higher score: hot=%.3f, cold=%.3f", hotScore, coldScore)
}
@@ -410,7 +410,7 @@ func TestMLCachePolicy_HotChunkPreference(t *testing.T) {
func TestMLCachePolicy_RecencyThresholds(t *testing.T) {
policy := NewMLCachePolicy()
-
+
// Test hot threshold
hotEntry := &CacheEntry{
Inode: 1,
@@ -418,7 +418,7 @@ func TestMLCachePolicy_RecencyThresholds(t *testing.T) {
LastAccess: time.Now().Add(-30 * time.Second), // Within hot threshold
AccessCount: 1,
}
-
+
// Test cold threshold
coldEntry := &CacheEntry{
Inode: 2,
@@ -426,7 +426,7 @@ func TestMLCachePolicy_RecencyThresholds(t *testing.T) {
LastAccess: time.Now().Add(-15 * time.Minute), // Beyond cold threshold
AccessCount: 1,
}
-
+
// Test middle
middleEntry := &CacheEntry{
Inode: 3,
@@ -434,39 +434,39 @@ func TestMLCachePolicy_RecencyThresholds(t *testing.T) {
LastAccess: time.Now().Add(-5 * time.Minute), // Between thresholds
AccessCount: 1,
}
-
+
hotScore := policy.calculateRecencyScore(time.Since(hotEntry.LastAccess))
coldScore := policy.calculateRecencyScore(time.Since(coldEntry.LastAccess))
middleScore := policy.calculateRecencyScore(time.Since(middleEntry.LastAccess))
-
+
if hotScore != 1.0 {
t.Errorf("Hot entry should have score 1.0, got %.3f", hotScore)
}
-
+
if coldScore != 0.1 {
t.Errorf("Cold entry should have score 0.1, got %.3f", coldScore)
}
-
+
if middleScore <= coldScore || middleScore >= hotScore {
- t.Errorf("Middle entry should have score between hot and cold: %.3f not in (%.3f, %.3f)",
+ t.Errorf("Middle entry should have score between hot and cold: %.3f not in (%.3f, %.3f)",
middleScore, coldScore, hotScore)
}
}
func TestMLCachePolicy_SizeScore(t *testing.T) {
policy := NewMLCachePolicy()
-
- smallSize := uint64(1024) // 1KB
+
+ smallSize := uint64(1024) // 1KB
largeSize := uint64(100 * 1024 * 1024) // 100MB
-
+
smallScore := policy.calculateSizeScore(smallSize)
largeScore := policy.calculateSizeScore(largeSize)
-
+
if smallScore <= largeScore {
- t.Errorf("Small files should have higher size score: small=%.3f, large=%.3f",
+ t.Errorf("Small files should have higher size score: small=%.3f, large=%.3f",
smallScore, largeScore)
}
-
+
// Large files should still have reasonable score (not too low)
if largeScore < 0.2 {
t.Errorf("Large files should have reasonable score, got %.3f", largeScore)
@@ -475,24 +475,24 @@ func TestMLCachePolicy_SizeScore(t *testing.T) {
func TestMLCachePolicy_AccessFrequencyScore(t *testing.T) {
policy := NewMLCachePolicy()
-
+
lowAccessEntry := &CacheEntry{
AccessCount: 1,
FileType: MLFileUnknown,
Pattern: RandomAccess,
}
-
+
highAccessEntry := &CacheEntry{
AccessCount: 100,
FileType: MLFileUnknown,
Pattern: RandomAccess,
}
-
+
lowScore := policy.calculateAccessFrequencyScore(lowAccessEntry)
highScore := policy.calculateAccessFrequencyScore(highAccessEntry)
-
+
if highScore <= lowScore {
- t.Errorf("High access count should have higher score: high=%.3f, low=%.3f",
+ t.Errorf("High access count should have higher score: high=%.3f, low=%.3f",
highScore, lowScore)
}
}
@@ -509,9 +509,9 @@ func abs(x float64) float64 {
func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) {
policy := NewMLCachePolicy()
-
+
entry := &CacheEntry{
- Inode: 1,
+ Inode: 1,
Size: 1024,
LastAccess: time.Now().Add(-5 * time.Minute),
@@ -521,9 +521,9 @@ func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) {
IsTrainingData: true,
EpochRelevance: 0.8,
}
-
+
b.ResetTimer()
-
+
for i := 0; i < b.N; i++ {
policy.CalculateEvictionScore(entry)
}
@@ -531,18 +531,18 @@ func BenchmarkMLCachePolicy_CalculateEvictionScore(b *testing.B) {
func BenchmarkMLCachePolicy_ShouldEvict(b *testing.B) {
policy := NewMLCachePolicy()
-
+
entry := &CacheEntry{
- Inode: 1,
-
+ Inode: 1,
+
Size: 1024,
LastAccess: time.Now().Add(-5 * time.Minute),
AccessCount: 10,
FileType: MLFileDataset,
}
-
+
b.ResetTimer()
-
+
for i := 0; i < b.N; i++ {
policy.ShouldEvict(entry)
}
diff --git a/weed/mount/ml/fuse_integration.go b/weed/mount/ml/fuse_integration.go
index 54b770eb5..71597f5d4 100644
--- a/weed/mount/ml/fuse_integration.go
+++ b/weed/mount/ml/fuse_integration.go
@@ -14,22 +14,22 @@ type FUSEMLIntegration struct {
openFileCache *OpenFileCache
cachePolicy *MLCachePolicy
mlOptimization *MLOptimization
-
+
// FUSE-specific configuration
- enableKeepCache bool // Enable FOPEN_KEEP_CACHE for ML files
- enableWriteback bool // Enable writeback caching
- attrCacheTimeout time.Duration // Attribute cache timeout for ML files
- entryCacheTimeout time.Duration // Entry cache timeout for ML files
-
+ enableKeepCache bool // Enable FOPEN_KEEP_CACHE for ML files
+ enableWriteback bool // Enable writeback caching
+ attrCacheTimeout time.Duration // Attribute cache timeout for ML files
+ entryCacheTimeout time.Duration // Entry cache timeout for ML files
+
// ML-specific FUSE optimizations
mlAttrTimeout time.Duration // Extended attribute timeout for ML files
datasetAttrTimeout time.Duration // Even longer timeout for dataset files
modelAttrTimeout time.Duration // Longest timeout for model files
-
+
// Statistics
- keepCacheEnabled int64 // Number of times keep cache was enabled
- writebackEnabled int64 // Number of times writeback was enabled
- mlAttrCacheHits int64 // ML-specific attribute cache hits
+ keepCacheEnabled int64 // Number of times keep cache was enabled
+ writebackEnabled int64 // Number of times writeback was enabled
+ mlAttrCacheHits int64 // ML-specific attribute cache hits
}
// NewFUSEMLIntegration creates a new FUSE ML integration
@@ -42,7 +42,7 @@ func NewFUSEMLIntegration(mlOpt *MLOptimization) *FUSEMLIntegration {
enableWriteback: true,
attrCacheTimeout: 5 * time.Second,
entryCacheTimeout: 10 * time.Second,
-
+
// ML-specific timeouts (longer for more stable caching)
mlAttrTimeout: 30 * time.Second,
datasetAttrTimeout: 60 * time.Second,
@@ -54,17 +54,17 @@ func NewFUSEMLIntegration(mlOpt *MLOptimization) *FUSEMLIntegration {
func (fmi *FUSEMLIntegration) OnFileOpen(inode uint64, entry *filer_pb.Entry, fullPath string, flags uint32, out *fuse.OpenOut) {
// Register file in cache
fileInfo := fmi.openFileCache.OpenFile(inode, entry, fullPath)
-
+
// Apply ML-specific FUSE optimizations
if fileInfo.IsMLFile && fmi.enableKeepCache {
// Enable keep cache for ML files to reduce redundant reads
out.OpenFlags |= fuse.FOPEN_KEEP_CACHE
fmi.keepCacheEnabled++
-
- glog.V(3).Infof("Enabled FOPEN_KEEP_CACHE for ML file: inode=%d, type=%v",
+
+ glog.V(3).Infof("Enabled FOPEN_KEEP_CACHE for ML file: inode=%d, type=%v",
inode, fileInfo.FileType)
}
-
+
// For large model files, also enable direct I/O to bypass page cache for very large reads
if fileInfo.FileType == MLFileModel && entry.Attributes.FileSize > 100*1024*1024 { // > 100MB
// Note: Direct I/O can be beneficial for very large sequential reads
@@ -79,7 +79,7 @@ func (fmi *FUSEMLIntegration) OnFileOpen(inode uint64, entry *filer_pb.Entry, fu
// OnFileClose handles file close events
func (fmi *FUSEMLIntegration) OnFileClose(inode uint64) {
canEvict := fmi.openFileCache.CloseFile(inode)
-
+
if canEvict {
glog.V(4).Infof("File closed and available for eviction: inode=%d", inode)
}
@@ -90,7 +90,7 @@ func (fmi *FUSEMLIntegration) OnFileRead(inode uint64, offset int64, size int) {
// Update access pattern
if fmi.mlOptimization != nil && fmi.mlOptimization.IsEnabled() {
accessInfo := fmi.mlOptimization.RecordAccess(inode, offset, size)
-
+
// Update file info with detected pattern
if fileInfo := fmi.openFileCache.GetFileInfo(inode); fileInfo != nil {
fileInfo.Lock()
@@ -100,10 +100,10 @@ func (fmi *FUSEMLIntegration) OnFileRead(inode uint64, offset int64, size int) {
}
fileInfo.TotalBytesRead += int64(size)
fileInfo.Unlock()
-
+
// Trigger prefetching if pattern detected
if shouldPrefetch, _ := fmi.mlOptimization.ShouldPrefetch(inode); shouldPrefetch {
- glog.V(4).Infof("Prefetch triggered for ML file: inode=%d, pattern=%v",
+ glog.V(4).Infof("Prefetch triggered for ML file: inode=%d, pattern=%v",
inode, fileInfo.ReadPattern)
}
}
@@ -118,10 +118,10 @@ func (fmi *FUSEMLIntegration) OptimizeAttributes(inode uint64, out *fuse.AttrOut
out.AttrValid = uint64(fmi.attrCacheTimeout.Seconds())
return
}
-
+
// Apply ML-specific timeouts
var timeout time.Duration
-
+
switch fileInfo.FileType {
case MLFileModel:
// Model files rarely change, cache attributes longer
@@ -136,15 +136,15 @@ func (fmi *FUSEMLIntegration) OptimizeAttributes(inode uint64, out *fuse.AttrOut
// Use default timeout for non-ML files
timeout = fmi.attrCacheTimeout
}
-
+
out.AttrValid = uint64(timeout.Seconds())
fmi.mlAttrCacheHits++
-
- glog.V(4).Infof("ML attribute cache timeout: inode=%d, type=%v, timeout=%v",
+
+ glog.V(4).Infof("ML attribute cache timeout: inode=%d, type=%v, timeout=%v",
inode, fileInfo.FileType, timeout)
}
-// OptimizeEntryCache applies ML-specific entry caching optimizations
+// OptimizeEntryCache applies ML-specific entry caching optimizations
func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.Entry, out *fuse.EntryOut) {
fileInfo := fmi.openFileCache.GetFileInfo(inode)
if fileInfo == nil {
@@ -152,10 +152,10 @@ func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.E
out.SetEntryTimeout(fmi.entryCacheTimeout)
return
}
-
+
// ML files can have longer entry cache timeouts since they change infrequently
var timeout time.Duration
-
+
switch fileInfo.FileType {
case MLFileModel, MLFileDataset:
// Models and datasets rarely change during training
@@ -166,10 +166,10 @@ func (fmi *FUSEMLIntegration) OptimizeEntryCache(inode uint64, entry *filer_pb.E
default:
timeout = fmi.entryCacheTimeout
}
-
+
out.SetEntryTimeout(timeout)
-
- glog.V(4).Infof("ML entry cache timeout: inode=%d, type=%v, timeout=%v",
+
+ glog.V(4).Infof("ML entry cache timeout: inode=%d, type=%v, timeout=%v",
inode, fileInfo.FileType, timeout)
}
@@ -178,12 +178,12 @@ func (fmi *FUSEMLIntegration) ShouldEnableWriteback(inode uint64, entry *filer_p
if !fmi.enableWriteback {
return false
}
-
+
fileInfo := fmi.openFileCache.GetFileInfo(inode)
if fileInfo == nil {
return false
}
-
+
// Enable writeback for ML files that are frequently written
switch fileInfo.FileType {
case MLFileLog:
@@ -204,7 +204,7 @@ func (fmi *FUSEMLIntegration) ShouldEnableWriteback(inode uint64, entry *filer_p
// Default behavior for non-ML files
return false
}
-
+
return false
}
@@ -213,15 +213,15 @@ func (fmi *FUSEMLIntegration) OnChunkAccess(inode uint64, chunkIndex uint32, fil
metadata := &ChunkMetadata{
FileId: fileId,
Offset: uint64(chunkIndex) * 1024, // Assuming 1KB chunks for now
- Size: 1024,
+ Size: 1024,
LastAccess: time.Now(),
CacheLevel: cacheLevel,
AccessCount: 1, // Will be incremented in UpdateChunkCache
}
-
+
// Update chunk cache
fmi.openFileCache.UpdateChunkCache(inode, chunkIndex, metadata)
-
+
// Update file-level statistics
if fileInfo := fmi.openFileCache.GetFileInfo(inode); fileInfo != nil {
fileInfo.Lock()
@@ -240,16 +240,16 @@ func (fmi *FUSEMLIntegration) GetOptimizationMetrics() FUSEMLMetrics {
if fmi.mlOptimization != nil {
mlMetrics = fmi.mlOptimization.GetMetrics()
}
-
+
return FUSEMLMetrics{
- MLOptimizationMetrics: mlMetrics,
- OpenFileCacheMetrics: fmi.openFileCache.GetMetrics(),
- CachePolicyMetrics: fmi.cachePolicy.GetEvictionMetrics(),
- KeepCacheEnabled: fmi.keepCacheEnabled,
- WritebackEnabled: fmi.writebackEnabled,
- MLAttrCacheHits: fmi.mlAttrCacheHits,
- EnableKeepCache: fmi.enableKeepCache,
- EnableWriteback: fmi.enableWriteback,
+ MLOptimizationMetrics: mlMetrics,
+ OpenFileCacheMetrics: fmi.openFileCache.GetMetrics(),
+ CachePolicyMetrics: fmi.cachePolicy.GetEvictionMetrics(),
+ KeepCacheEnabled: fmi.keepCacheEnabled,
+ WritebackEnabled: fmi.writebackEnabled,
+ MLAttrCacheHits: fmi.mlAttrCacheHits,
+ EnableKeepCache: fmi.enableKeepCache,
+ EnableWriteback: fmi.enableWriteback,
}
}
@@ -258,12 +258,12 @@ type FUSEMLMetrics struct {
MLOptimizationMetrics *MLOptimizationMetrics `json:"ml_optimization,omitempty"`
OpenFileCacheMetrics OpenFileCacheMetrics `json:"open_file_cache"`
CachePolicyMetrics MLCachePolicyMetrics `json:"cache_policy"`
-
+
// FUSE-specific metrics
- KeepCacheEnabled int64 `json:"keep_cache_enabled"`
- WritebackEnabled int64 `json:"writeback_enabled"`
- MLAttrCacheHits int64 `json:"ml_attr_cache_hits"`
-
+ KeepCacheEnabled int64 `json:"keep_cache_enabled"`
+ WritebackEnabled int64 `json:"writeback_enabled"`
+ MLAttrCacheHits int64 `json:"ml_attr_cache_hits"`
+
// Configuration
EnableKeepCache bool `json:"enable_keep_cache"`
EnableWriteback bool `json:"enable_writeback"`
@@ -272,18 +272,18 @@ type FUSEMLMetrics struct {
// Shutdown gracefully shuts down the FUSE ML integration
func (fmi *FUSEMLIntegration) Shutdown() {
glog.V(1).Infof("Shutting down FUSE ML integration...")
-
+
if fmi.openFileCache != nil {
fmi.openFileCache.Shutdown()
}
-
+
if fmi.mlOptimization != nil {
fmi.mlOptimization.Shutdown()
}
-
+
// Print final metrics
metrics := fmi.GetOptimizationMetrics()
- glog.V(1).Infof("FUSE ML integration final metrics: keep_cache=%d, writeback=%d, attr_hits=%d",
+ glog.V(1).Infof("FUSE ML integration final metrics: keep_cache=%d, writeback=%d, attr_hits=%d",
metrics.KeepCacheEnabled, metrics.WritebackEnabled, metrics.MLAttrCacheHits)
}
@@ -291,11 +291,11 @@ func (fmi *FUSEMLIntegration) Shutdown() {
func (fmi *FUSEMLIntegration) EnableMLOptimizations(enabled bool) {
fmi.enableKeepCache = enabled
fmi.enableWriteback = enabled
-
+
if fmi.mlOptimization != nil {
fmi.mlOptimization.Enable(enabled)
}
-
+
glog.V(1).Infof("ML FUSE optimizations %s", map[bool]string{true: "enabled", false: "disabled"}[enabled])
}
@@ -306,7 +306,7 @@ func (fmi *FUSEMLIntegration) SetCacheTimeouts(attr, entry, mlAttr, dataset, mod
fmi.mlAttrTimeout = mlAttr
fmi.datasetAttrTimeout = dataset
fmi.modelAttrTimeout = model
-
- glog.V(2).Infof("Updated cache timeouts: attr=%v, entry=%v, ml=%v, dataset=%v, model=%v",
+
+ glog.V(2).Infof("Updated cache timeouts: attr=%v, entry=%v, ml=%v, dataset=%v, model=%v",
attr, entry, mlAttr, dataset, model)
}
diff --git a/weed/mount/ml/open_file_cache.go b/weed/mount/ml/open_file_cache.go
index 8d23a0bd8..fe3e794a8 100644
--- a/weed/mount/ml/open_file_cache.go
+++ b/weed/mount/ml/open_file_cache.go
@@ -10,48 +10,48 @@ import (
// ChunkMetadata contains metadata about a cached chunk
type ChunkMetadata struct {
- FileId string // Chunk file ID
- Offset uint64 // Offset within the file
- Size uint64 // Size of the chunk
- CacheLevel int // 0=memory, 1=disk, 2=not cached
- LastAccess time.Time // Last access time
- AccessCount int64 // Number of times accessed
- IsHot bool // Whether this chunk is frequently accessed
- Pattern AccessPattern // Access pattern for this chunk
+ FileId string // Chunk file ID
+ Offset uint64 // Offset within the file
+ Size uint64 // Size of the chunk
+ CacheLevel int // 0=memory, 1=disk, 2=not cached
+ LastAccess time.Time // Last access time
+ AccessCount int64 // Number of times accessed
+ IsHot bool // Whether this chunk is frequently accessed
+ Pattern AccessPattern // Access pattern for this chunk
}
// OpenFileInfo contains comprehensive information about an open file
type OpenFileInfo struct {
sync.RWMutex
-
+
// Basic file information
- Inode uint64 // File inode
- Entry *filer_pb.Entry // File entry from filer
- OpenCount int // Number of open handles
- OpenTime time.Time // When file was first opened
- LastAccess time.Time // Last access time
-
+ Inode uint64 // File inode
+ Entry *filer_pb.Entry // File entry from filer
+ OpenCount int // Number of open handles
+ OpenTime time.Time // When file was first opened
+ LastAccess time.Time // Last access time
+
// Chunk-level caching
- ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata
- ChunkCount uint32 // Total number of chunks in file
- ChunkSize int64 // Size of each chunk
-
+ ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata
+ ChunkCount uint32 // Total number of chunks in file
+ ChunkSize int64 // Size of each chunk
+
// Access pattern tracking
- AccessInfo *AccessInfo // Access pattern information
- ReadPattern AccessPattern // Overall file access pattern
- PrefetchState PrefetchState // Current prefetch state
-
+ AccessInfo *AccessInfo // Access pattern information
+ ReadPattern AccessPattern // Overall file access pattern
+ PrefetchState PrefetchState // Current prefetch state
+
// ML-specific optimizations
- IsMLFile bool // Whether this is likely an ML-related file
- FileType MLFileType // Type of ML file (dataset, model, etc.)
- BatchSize int // Detected batch size for training data
- EpochCount int // Number of epochs detected
-
+ IsMLFile bool // Whether this is likely an ML-related file
+ FileType MLFileType // Type of ML file (dataset, model, etc.)
+ BatchSize int // Detected batch size for training data
+ EpochCount int // Number of epochs detected
+
// Performance tracking
- TotalBytesRead int64 // Total bytes read from this file
- CacheHitCount int64 // Number of cache hits
- CacheMissCount int64 // Number of cache misses
- PrefetchHitCount int64 // Number of prefetch hits
+ TotalBytesRead int64 // Total bytes read from this file
+ CacheHitCount int64 // Number of cache hits
+ CacheMissCount int64 // Number of cache misses
+ PrefetchHitCount int64 // Number of prefetch hits
}
// PrefetchState represents the current prefetch state for a file
@@ -69,36 +69,36 @@ type MLFileType int
const (
MLFileUnknown MLFileType = iota
- MLFileDataset // Training/validation dataset
- MLFileModel // Model checkpoint/weights
- MLFileConfig // Configuration files
- MLFileTensor // Individual tensor files
- MLFileLog // Training logs
+ MLFileDataset // Training/validation dataset
+ MLFileModel // Model checkpoint/weights
+ MLFileConfig // Configuration files
+ MLFileTensor // Individual tensor files
+ MLFileLog // Training logs
)
// OpenFileCache manages open file information with ML-aware optimizations
type OpenFileCache struct {
sync.RWMutex
-
+
// Configuration
maxFiles int // Maximum number of files to track
ttl time.Duration // TTL for inactive files
cleanupInterval time.Duration // Cleanup interval
-
+
// File tracking
- files map[uint64]*OpenFileInfo // inode -> file info
- accessOrder []uint64 // LRU order for eviction
-
+ files map[uint64]*OpenFileInfo // inode -> file info
+ accessOrder []uint64 // LRU order for eviction
+
// ML-specific configuration
enableMLOptimization bool
- mlFileDetector *MLFileDetector
-
+ mlFileDetector *MLFileDetector
+
// Metrics
- totalFiles int64
- evictedFiles int64
- cacheHits int64
- cacheMisses int64
-
+ totalFiles int64
+ evictedFiles int64
+ cacheHits int64
+ cacheMisses int64
+
// Background cleanup
shutdown chan struct{}
done chan struct{}
@@ -110,11 +110,11 @@ type MLFileDetector struct {
datasetExtensions map[string]bool
modelExtensions map[string]bool
configExtensions map[string]bool
-
+
// Path patterns
datasetPaths []string
modelPaths []string
-
+
// Size heuristics
modelMinSize int64 // Minimum size for model files
datasetMaxItems int // Maximum items in dataset directory
@@ -128,22 +128,22 @@ func NewOpenFileCache(maxFiles int, ttl time.Duration) *OpenFileCache {
if ttl <= 0 {
ttl = 30 * time.Minute // Default TTL
}
-
+
ofc := &OpenFileCache{
- maxFiles: maxFiles,
- ttl: ttl,
- cleanupInterval: 5 * time.Minute,
- files: make(map[uint64]*OpenFileInfo),
- accessOrder: make([]uint64, 0, maxFiles),
+ maxFiles: maxFiles,
+ ttl: ttl,
+ cleanupInterval: 5 * time.Minute,
+ files: make(map[uint64]*OpenFileInfo),
+ accessOrder: make([]uint64, 0, maxFiles),
enableMLOptimization: true,
- mlFileDetector: newMLFileDetector(),
- shutdown: make(chan struct{}),
- done: make(chan struct{}),
+ mlFileDetector: newMLFileDetector(),
+ shutdown: make(chan struct{}),
+ done: make(chan struct{}),
}
-
+
// Start background cleanup
go ofc.cleanupWorker()
-
+
glog.V(1).Infof("OpenFileCache initialized: maxFiles=%d, ttl=%v", maxFiles, ttl)
return ofc
}
@@ -185,7 +185,7 @@ func newMLFileDetector() *MLFileDetector {
func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath string) *OpenFileInfo {
ofc.Lock()
defer ofc.Unlock()
-
+
// Get or create file info
fileInfo := ofc.files[inode]
if fileInfo == nil {
@@ -198,35 +198,35 @@ func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath
ReadPattern: RandomAccess,
PrefetchState: PrefetchIdle,
}
-
+
// Detect ML file type
if ofc.enableMLOptimization {
fileInfo.IsMLFile, fileInfo.FileType = ofc.mlFileDetector.DetectMLFile(entry, fullPath)
if fileInfo.IsMLFile {
- glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s",
+ glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s",
inode, fileInfo.FileType, fullPath)
}
}
-
+
ofc.files[inode] = fileInfo
ofc.totalFiles++
-
+
// Update access order for LRU
ofc.updateAccessOrder(inode)
-
+
// Evict if necessary
if len(ofc.files) > ofc.maxFiles {
ofc.evictLRU()
}
}
-
+
fileInfo.OpenCount++
fileInfo.LastAccess = time.Now()
ofc.updateAccessOrder(inode)
-
- glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v",
+
+ glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v",
inode, fileInfo.OpenCount, fileInfo.IsMLFile)
-
+
return fileInfo
}
@@ -234,15 +234,15 @@ func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath
func (ofc *OpenFileCache) CloseFile(inode uint64) bool {
ofc.Lock()
defer ofc.Unlock()
-
+
fileInfo := ofc.files[inode]
if fileInfo == nil {
return true // Already cleaned up
}
-
+
fileInfo.OpenCount--
glog.V(4).Infof("File closed: inode=%d, openCount=%d", inode, fileInfo.OpenCount)
-
+
// Return true if file can be evicted (no more open handles)
return fileInfo.OpenCount <= 0
}
@@ -251,14 +251,14 @@ func (ofc *OpenFileCache) CloseFile(inode uint64) bool {
func (ofc *OpenFileCache) GetFileInfo(inode uint64) *OpenFileInfo {
ofc.RLock()
defer ofc.RUnlock()
-
+
fileInfo := ofc.files[inode]
if fileInfo != nil {
fileInfo.LastAccess = time.Now()
ofc.cacheHits++
return fileInfo
}
-
+
ofc.cacheMisses++
return nil
}
@@ -268,19 +268,19 @@ func (ofc *OpenFileCache) UpdateChunkCache(inode uint64, chunkIndex uint32, meta
ofc.RLock()
fileInfo := ofc.files[inode]
ofc.RUnlock()
-
+
if fileInfo == nil {
return
}
-
+
fileInfo.Lock()
defer fileInfo.Unlock()
-
+
fileInfo.ChunkCache[chunkIndex] = metadata
metadata.LastAccess = time.Now()
metadata.AccessCount++
-
- glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d",
+
+ glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d",
inode, chunkIndex, metadata.CacheLevel)
}
@@ -289,20 +289,20 @@ func (ofc *OpenFileCache) GetChunkMetadata(inode uint64, chunkIndex uint32) (*Ch
ofc.RLock()
fileInfo := ofc.files[inode]
ofc.RUnlock()
-
+
if fileInfo == nil {
return nil, false
}
-
+
fileInfo.RLock()
defer fileInfo.RUnlock()
-
+
metadata, exists := fileInfo.ChunkCache[chunkIndex]
if exists {
metadata.LastAccess = time.Now()
metadata.AccessCount++
}
-
+
return metadata, exists
}
@@ -315,7 +315,7 @@ func (ofc *OpenFileCache) updateAccessOrder(inode uint64) {
break
}
}
-
+
// Add to front (most recently used)
ofc.accessOrder = append([]uint64{inode}, ofc.accessOrder...)
}
@@ -325,24 +325,24 @@ func (ofc *OpenFileCache) evictLRU() {
if len(ofc.accessOrder) == 0 {
return
}
-
+
// Find LRU file that can be evicted (not currently open)
for i := len(ofc.accessOrder) - 1; i >= 0; i-- {
inode := ofc.accessOrder[i]
fileInfo := ofc.files[inode]
-
+
if fileInfo != nil && fileInfo.OpenCount <= 0 {
// Evict this file
delete(ofc.files, inode)
ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...)
ofc.evictedFiles++
-
- glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d",
+
+ glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d",
inode, len(fileInfo.ChunkCache))
return
}
}
-
+
// If no files can be evicted, just log a warning
glog.V(2).Infof("Warning: Could not evict any files from cache (all files are open)")
}
@@ -351,7 +351,7 @@ func (ofc *OpenFileCache) evictLRU() {
func (ofc *OpenFileCache) cleanupWorker() {
ticker := time.NewTicker(ofc.cleanupInterval)
defer ticker.Stop()
-
+
for {
select {
case <-ticker.C:
@@ -367,17 +367,17 @@ func (ofc *OpenFileCache) cleanupWorker() {
func (ofc *OpenFileCache) cleanup() {
ofc.Lock()
defer ofc.Unlock()
-
+
now := time.Now()
toRemove := make([]uint64, 0)
-
+
for inode, fileInfo := range ofc.files {
// Only cleanup files that are not open and have expired
if fileInfo.OpenCount <= 0 && now.Sub(fileInfo.LastAccess) > ofc.ttl {
toRemove = append(toRemove, inode)
}
}
-
+
// Remove expired files
for _, inode := range toRemove {
delete(ofc.files, inode)
@@ -389,7 +389,7 @@ func (ofc *OpenFileCache) cleanup() {
}
}
}
-
+
if len(toRemove) > 0 {
glog.V(3).Infof("Cleaned up %d expired file cache entries", len(toRemove))
}
@@ -399,12 +399,12 @@ func (ofc *OpenFileCache) cleanup() {
func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics {
ofc.RLock()
defer ofc.RUnlock()
-
+
var totalChunks int64
var mlFiles int64
fileTypes := make(map[MLFileType]int)
patterns := make(map[AccessPattern]int)
-
+
for _, fileInfo := range ofc.files {
totalChunks += int64(len(fileInfo.ChunkCache))
if fileInfo.IsMLFile {
@@ -413,43 +413,43 @@ func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics {
}
patterns[fileInfo.ReadPattern]++
}
-
+
return OpenFileCacheMetrics{
- TotalFiles: int64(len(ofc.files)),
- MLFiles: mlFiles,
- TotalChunks: totalChunks,
- CacheHits: ofc.cacheHits,
- CacheMisses: ofc.cacheMisses,
- EvictedFiles: ofc.evictedFiles,
- FileTypes: fileTypes,
+ TotalFiles: int64(len(ofc.files)),
+ MLFiles: mlFiles,
+ TotalChunks: totalChunks,
+ CacheHits: ofc.cacheHits,
+ CacheMisses: ofc.cacheMisses,
+ EvictedFiles: ofc.evictedFiles,
+ FileTypes: fileTypes,
AccessPatterns: patterns,
}
}
// OpenFileCacheMetrics holds metrics for the open file cache
type OpenFileCacheMetrics struct {
- TotalFiles int64 `json:"total_files"`
- MLFiles int64 `json:"ml_files"`
- TotalChunks int64 `json:"total_chunks"`
- CacheHits int64 `json:"cache_hits"`
- CacheMisses int64 `json:"cache_misses"`
- EvictedFiles int64 `json:"evicted_files"`
- FileTypes map[MLFileType]int `json:"file_types"`
- AccessPatterns map[AccessPattern]int `json:"access_patterns"`
+ TotalFiles int64 `json:"total_files"`
+ MLFiles int64 `json:"ml_files"`
+ TotalChunks int64 `json:"total_chunks"`
+ CacheHits int64 `json:"cache_hits"`
+ CacheMisses int64 `json:"cache_misses"`
+ EvictedFiles int64 `json:"evicted_files"`
+ FileTypes map[MLFileType]int `json:"file_types"`
+ AccessPatterns map[AccessPattern]int `json:"access_patterns"`
}
// Shutdown gracefully shuts down the open file cache
func (ofc *OpenFileCache) Shutdown() {
glog.V(1).Infof("Shutting down OpenFileCache...")
-
+
close(ofc.shutdown)
-
+
// Wait for cleanup worker to finish
<-ofc.done
-
+
// Print final metrics
metrics := ofc.GetMetrics()
- glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d",
+ glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d",
metrics.TotalFiles, metrics.TotalChunks, metrics.CacheHits, metrics.CacheMisses)
}
@@ -460,10 +460,10 @@ func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath str
if entry == nil {
return false, MLFileUnknown
}
-
+
name := entry.Name
size := int64(entry.Attributes.FileSize)
-
+
// Check file extension
if ext := getFileExtension(name); ext != "" {
if detector.datasetExtensions[ext] {
@@ -476,20 +476,20 @@ func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath str
return true, MLFileConfig
}
}
-
+
// Check path patterns
for _, path := range detector.datasetPaths {
if contains(fullPath, path) {
return true, MLFileDataset
}
}
-
+
for _, path := range detector.modelPaths {
if contains(fullPath, path) {
return true, MLFileModel
}
}
-
+
// Check size heuristics
if size > detector.modelMinSize {
// Large files in certain contexts might be models
@@ -497,17 +497,17 @@ func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath str
return true, MLFileModel
}
}
-
+
// Check for tensor files
if contains(name, "tensor") || contains(name, ".pt") || contains(name, ".npy") {
return true, MLFileTensor
}
-
+
// Check for log files
if contains(name, "log") || contains(name, "tensorboard") || contains(fullPath, "logs") {
return true, MLFileLog
}
-
+
return false, MLFileUnknown
}
@@ -533,7 +533,7 @@ func findSubstring(str, substr string) bool {
if len(str) < len(substr) {
return false
}
-
+
for i := 0; i <= len(str)-len(substr); i++ {
if str[i:i+len(substr)] == substr {
return true
diff --git a/weed/mount/ml/open_file_cache_test.go b/weed/mount/ml/open_file_cache_test.go
index d7d3e9664..5353cefcd 100644
--- a/weed/mount/ml/open_file_cache_test.go
+++ b/weed/mount/ml/open_file_cache_test.go
@@ -13,24 +13,24 @@ func TestOpenFileCache_Basic(t *testing.T) {
// Test opening a file
entry := &filer_pb.Entry{
- Name: "test.txt",
+ Name: "test.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
-
+
inode := uint64(1)
fullPath := "/test/test.txt"
fileInfo := cache.OpenFile(inode, entry, fullPath)
-
+
if fileInfo == nil {
t.Fatal("OpenFile should return file info")
}
-
+
if fileInfo.Inode != inode {
t.Errorf("Expected inode %d, got %d", inode, fileInfo.Inode)
}
-
+
if fileInfo.OpenCount != 1 {
t.Errorf("Expected open count 1, got %d", fileInfo.OpenCount)
}
@@ -47,26 +47,26 @@ func TestOpenFileCache_MLFileDetection(t *testing.T) {
size uint64
expected MLFileType
}{
- {"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100*1024*1024, MLFileModel},
- {"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2*1024*1024, MLFileDataset},
+ {"PyTorch model", "/models/checkpoint.pt", "checkpoint.pt", 100 * 1024 * 1024, MLFileModel},
+ {"Dataset image", "/datasets/train/image001.jpg", "image001.jpg", 2 * 1024 * 1024, MLFileDataset},
{"Config file", "/config/training.yaml", "training.yaml", 1024, MLFileConfig},
- {"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50*1024*1024, MLFileModel},
- {"Log file", "/logs/training.log", "training.log", 10*1024, MLFileLog},
- {"Regular file", "/documents/readme.txt", "readme.txt", 5*1024, MLFileUnknown},
+ {"Tensor file", "/tensors/weights.safetensors", "weights.safetensors", 50 * 1024 * 1024, MLFileModel},
+ {"Log file", "/logs/training.log", "training.log", 10 * 1024, MLFileLog},
+ {"Regular file", "/documents/readme.txt", "readme.txt", 5 * 1024, MLFileUnknown},
}
-
+
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
entry := &filer_pb.Entry{
- Name: tc.filename,
+ Name: tc.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: tc.size,
},
}
-
+
inode := uint64(time.Now().UnixNano()) // Unique inode
fileInfo := cache.OpenFile(inode, entry, tc.path)
-
+
if tc.expected == MLFileUnknown {
if fileInfo.IsMLFile {
t.Errorf("File %s should not be detected as ML file", tc.path)
@@ -75,7 +75,7 @@ func TestOpenFileCache_MLFileDetection(t *testing.T) {
if !fileInfo.IsMLFile {
t.Errorf("File %s should be detected as ML file", tc.path)
}
-
+
if fileInfo.FileType != tc.expected {
t.Errorf("Expected file type %v, got %v", tc.expected, fileInfo.FileType)
}
@@ -90,15 +90,15 @@ func TestOpenFileCache_ChunkMetadata(t *testing.T) {
inode := uint64(1)
entry := &filer_pb.Entry{
- Name: "data.bin",
+ Name: "data.bin",
Attributes: &filer_pb.FuseAttributes{
FileSize: 10240,
},
}
fullPath := "/data/data.bin"
-
+
cache.OpenFile(inode, entry, fullPath)
-
+
// Test updating chunk metadata
chunkIndex := uint32(0)
metadata := &ChunkMetadata{
@@ -110,19 +110,19 @@ func TestOpenFileCache_ChunkMetadata(t *testing.T) {
AccessCount: 1,
Pattern: SequentialAccess,
}
-
+
cache.UpdateChunkCache(inode, chunkIndex, metadata)
-
+
// Test retrieving chunk metadata
retrieved, exists := cache.GetChunkMetadata(inode, chunkIndex)
if !exists {
t.Error("Chunk metadata should exist")
}
-
+
if retrieved.FileId != metadata.FileId {
t.Errorf("Expected FileId %s, got %s", metadata.FileId, retrieved.FileId)
}
-
+
if retrieved.AccessCount != 2 { // Should be incremented during retrieval
t.Errorf("Expected access count 2, got %d", retrieved.AccessCount)
}
@@ -135,7 +135,7 @@ func TestOpenFileCache_LRUEviction(t *testing.T) {
// Fill cache to capacity
for i := 1; i <= 3; i++ {
entry := &filer_pb.Entry{
- Name: "file" + string(rune('0'+i)) + ".txt",
+ Name: "file" + string(rune('0'+i)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
@@ -144,27 +144,27 @@ func TestOpenFileCache_LRUEviction(t *testing.T) {
cache.OpenFile(uint64(i), entry, fullPath)
cache.CloseFile(uint64(i)) // Close immediately so they can be evicted
}
-
+
// Add one more file - should trigger eviction
entry4 := &filer_pb.Entry{
- Name: "file4.txt",
+ Name: "file4.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
cache.OpenFile(uint64(4), entry4, "/test/file4.txt")
-
+
metrics := cache.GetMetrics()
if metrics.EvictedFiles == 0 {
t.Error("Should have evicted at least one file")
}
-
+
// File 1 should be evicted (oldest)
file1Info := cache.GetFileInfo(uint64(1))
if file1Info != nil {
t.Error("File 1 should have been evicted")
}
-
+
// File 4 should still be there
file4Info := cache.GetFileInfo(uint64(4))
if file4Info == nil {
@@ -178,27 +178,27 @@ func TestOpenFileCache_TTLCleanup(t *testing.T) {
inode := uint64(1)
entry := &filer_pb.Entry{
- Name: "test.txt",
+ Name: "test.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
-
+
fileInfo := cache.OpenFile(inode, entry, "/test/test.txt")
cache.CloseFile(inode) // Close so it can be cleaned up
-
+
// Wait for TTL to expire
time.Sleep(150 * time.Millisecond)
-
+
// Trigger cleanup manually
cache.cleanup()
-
+
// File should be cleaned up
retrievedInfo := cache.GetFileInfo(inode)
if retrievedInfo != nil {
t.Error("File should have been cleaned up after TTL expiration")
}
-
+
_ = fileInfo // Avoid unused variable warning
}
@@ -208,35 +208,35 @@ func TestOpenFileCache_MultipleOpens(t *testing.T) {
inode := uint64(1)
entry := &filer_pb.Entry{
- Name: "shared.txt",
+ Name: "shared.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/shared.txt"
-
+
// Open file multiple times
fileInfo1 := cache.OpenFile(inode, entry, fullPath)
fileInfo2 := cache.OpenFile(inode, entry, fullPath)
-
+
if fileInfo1 != fileInfo2 {
t.Error("Multiple opens of same file should return same file info")
}
-
+
if fileInfo1.OpenCount != 2 {
t.Errorf("Expected open count 2, got %d", fileInfo1.OpenCount)
}
-
+
// Close once
canEvict1 := cache.CloseFile(inode)
if canEvict1 {
t.Error("Should not be able to evict file with open count > 0")
}
-
+
if fileInfo1.OpenCount != 1 {
t.Errorf("Expected open count 1 after first close, got %d", fileInfo1.OpenCount)
}
-
+
// Close again
canEvict2 := cache.CloseFile(inode)
if !canEvict2 {
@@ -260,16 +260,16 @@ func TestOpenFileCache_Metrics(t *testing.T) {
{3, "config.yaml", "/config/config.yaml", 1024},
{4, "regular.txt", "/docs/regular.txt", 5 * 1024},
}
-
+
for _, file := range files {
entry := &filer_pb.Entry{
- Name: file.filename,
+ Name: file.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: file.size,
},
}
cache.OpenFile(file.inode, entry, file.path)
-
+
// Add some chunk metadata
metadata := &ChunkMetadata{
FileId: "chunk_" + string(rune(file.inode)),
@@ -279,26 +279,26 @@ func TestOpenFileCache_Metrics(t *testing.T) {
}
cache.UpdateChunkCache(file.inode, 0, metadata)
}
-
+
metrics := cache.GetMetrics()
-
+
if metrics.TotalFiles != 4 {
t.Errorf("Expected 4 total files, got %d", metrics.TotalFiles)
}
-
+
if metrics.MLFiles < 2 { // Should detect at least model and dataset
t.Errorf("Expected at least 2 ML files, got %d", metrics.MLFiles)
}
-
+
if metrics.TotalChunks != 4 {
t.Errorf("Expected 4 total chunks, got %d", metrics.TotalChunks)
}
-
+
// Check file type counts
if metrics.FileTypes[MLFileModel] == 0 {
t.Error("Should detect at least one model file")
}
-
+
if metrics.FileTypes[MLFileDataset] == 0 {
t.Error("Should detect at least one dataset file")
}
@@ -311,24 +311,24 @@ func TestOpenFileCache_ConcurrentAccess(t *testing.T) {
// Test concurrent access to the cache
numGoroutines := 10
done := make(chan bool, numGoroutines)
-
+
for i := 0; i < numGoroutines; i++ {
go func(id int) {
defer func() { done <- true }()
-
+
inode := uint64(id)
entry := &filer_pb.Entry{
- Name: "file" + string(rune('0'+id)) + ".txt",
+ Name: "file" + string(rune('0'+id)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/file" + string(rune('0'+id)) + ".txt"
-
+
// Perform multiple operations
for j := 0; j < 10; j++ {
cache.OpenFile(inode, entry, fullPath)
-
+
metadata := &ChunkMetadata{
FileId: "chunk_" + string(rune(id)) + "_" + string(rune(j)),
Offset: uint64(j * 1024),
@@ -336,18 +336,18 @@ func TestOpenFileCache_ConcurrentAccess(t *testing.T) {
CacheLevel: 0,
}
cache.UpdateChunkCache(inode, uint32(j), metadata)
-
+
cache.GetChunkMetadata(inode, uint32(j))
cache.CloseFile(inode)
}
}(i)
}
-
+
// Wait for all goroutines to complete
for i := 0; i < numGoroutines; i++ {
<-done
}
-
+
// Verify cache state
metrics := cache.GetMetrics()
if metrics.TotalFiles == 0 {
@@ -357,7 +357,7 @@ func TestOpenFileCache_ConcurrentAccess(t *testing.T) {
func TestMLFileDetector_Extensions(t *testing.T) {
detector := newMLFileDetector()
-
+
testCases := []struct {
filename string
path string
@@ -369,20 +369,20 @@ func TestMLFileDetector_Extensions(t *testing.T) {
{"config.yaml", "/config/config.yaml", MLFileConfig},
{"tensor.safetensors", "/tensors/tensor.safetensors", MLFileModel},
{"training.log", "/logs/training.log", MLFileLog},
- {"document.txt", "/docs/document.txt", MLFileUnknown},
+ {"document.txt", "/docs/document.txt", MLFileUnknown},
}
-
+
for _, tc := range testCases {
t.Run(tc.filename, func(t *testing.T) {
entry := &filer_pb.Entry{
- Name: tc.filename,
+ Name: tc.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
-
+
isML, fileType := detector.DetectMLFile(entry, tc.path)
-
+
if tc.expected == MLFileUnknown {
// For unknown files, either ML detection result is acceptable
t.Logf("File %s: isML=%v, type=%v", tc.filename, isML, fileType)
@@ -390,7 +390,7 @@ func TestMLFileDetector_Extensions(t *testing.T) {
if !isML {
t.Errorf("File %s should be detected as ML file", tc.filename)
}
-
+
if fileType != tc.expected {
t.Errorf("File %s: expected type %v, got %v", tc.filename, tc.expected, fileType)
}
@@ -401,7 +401,7 @@ func TestMLFileDetector_Extensions(t *testing.T) {
func TestMLFileDetector_PathPatterns(t *testing.T) {
detector := newMLFileDetector()
-
+
testCases := []struct {
path string
filename string
@@ -413,25 +413,25 @@ func TestMLFileDetector_PathPatterns(t *testing.T) {
{"/checkpoints/model_v1.bin", "model_v1.bin", MLFileModel},
{"/documents/report.pdf", "report.pdf", MLFileUnknown},
}
-
+
for _, tc := range testCases {
t.Run(tc.path, func(t *testing.T) {
entry := &filer_pb.Entry{
- Name: tc.filename,
+ Name: tc.filename,
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
-
+
isML, fileType := detector.DetectMLFile(entry, tc.path)
-
+
if tc.expected == MLFileUnknown {
t.Logf("Path %s: isML=%v, type=%v", tc.path, isML, fileType)
} else {
if !isML {
t.Errorf("Path %s should be detected as ML file", tc.path)
}
-
+
if fileType != tc.expected {
t.Errorf("Path %s: expected type %v, got %v", tc.path, tc.expected, fileType)
}
@@ -442,21 +442,21 @@ func TestMLFileDetector_PathPatterns(t *testing.T) {
func TestMLFileDetector_SizeHeuristics(t *testing.T) {
detector := newMLFileDetector()
-
+
// Large file with model-related name should be detected as model
largeModelEntry := &filer_pb.Entry{
- Name: "large_model.bin",
+ Name: "large_model.bin",
Attributes: &filer_pb.FuseAttributes{
FileSize: 500 * 1024 * 1024, // 500MB
},
}
-
+
isML, fileType := detector.DetectMLFile(largeModelEntry, "/checkpoints/large_model.bin")
-
+
if !isML {
t.Error("Large model file should be detected as ML file")
}
-
+
if fileType != MLFileModel {
t.Errorf("Large model file should be detected as model, got %v", fileType)
}
@@ -469,7 +469,7 @@ func TestOpenFileCache_EvictionProtection(t *testing.T) {
// Open two files and keep them open
for i := 1; i <= 2; i++ {
entry := &filer_pb.Entry{
- Name: "file" + string(rune('0'+i)) + ".txt",
+ Name: "file" + string(rune('0'+i)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
@@ -478,16 +478,16 @@ func TestOpenFileCache_EvictionProtection(t *testing.T) {
cache.OpenFile(uint64(i), entry, fullPath)
// Don't close - keep them open
}
-
+
// Try to open a third file - should not evict open files
entry3 := &filer_pb.Entry{
- Name: "file3.txt",
+ Name: "file3.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
cache.OpenFile(uint64(3), entry3, "/test/file3.txt")
-
+
// All files should still be there since none could be evicted
for i := 1; i <= 3; i++ {
fileInfo := cache.GetFileInfo(uint64(i))
@@ -502,38 +502,38 @@ func TestOpenFileCache_GetFileInfo_CacheHitMiss(t *testing.T) {
defer cache.Shutdown()
inode := uint64(1)
-
+
// Test cache miss
fileInfo := cache.GetFileInfo(inode)
if fileInfo != nil {
t.Error("Should return nil for non-existent file")
}
-
+
initialMetrics := cache.GetMetrics()
if initialMetrics.CacheMisses == 0 {
t.Error("Should record cache miss")
}
-
+
// Add file to cache
entry := &filer_pb.Entry{
- Name: "test.txt",
+ Name: "test.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
cache.OpenFile(inode, entry, "/test/test.txt")
-
+
// Test cache hit
fileInfo = cache.GetFileInfo(inode)
if fileInfo == nil {
t.Error("Should return file info for existing file")
}
-
+
finalMetrics := cache.GetMetrics()
if finalMetrics.CacheHits == 0 {
t.Error("Should record cache hit")
}
-
+
if finalMetrics.CacheHits <= initialMetrics.CacheHits {
t.Error("Cache hits should increase")
}
@@ -545,7 +545,7 @@ func TestOpenFileCache_Shutdown(t *testing.T) {
// Add some files
for i := 1; i <= 3; i++ {
entry := &filer_pb.Entry{
- Name: "file" + string(rune('0'+i)) + ".txt",
+ Name: "file" + string(rune('0'+i)) + ".txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
@@ -576,7 +576,7 @@ func BenchmarkOpenFileCache_OpenFile(b *testing.B) {
defer cache.Shutdown()
entry := &filer_pb.Entry{
- Name: "benchmark.txt",
+ Name: "benchmark.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
@@ -597,13 +597,13 @@ func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) {
// Pre-populate cache
entry := &filer_pb.Entry{
- Name: "benchmark.txt",
+ Name: "benchmark.txt",
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
}
fullPath := "/test/benchmark.txt"
-
+
for i := 0; i < 100; i++ {
cache.OpenFile(uint64(i), entry, fullPath)
}
@@ -614,4 +614,4 @@ func BenchmarkOpenFileCache_GetFileInfo(b *testing.B) {
inode := uint64(i % 100)
cache.GetFileInfo(inode)
}
-} \ No newline at end of file
+}
diff --git a/weed/mount/ml_integration.go b/weed/mount/ml_integration.go
index c79882c82..fefd23cd6 100644
--- a/weed/mount/ml_integration.go
+++ b/weed/mount/ml_integration.go
@@ -1,8 +1,6 @@
package mount
import (
- "time"
-
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/ml"
@@ -13,9 +11,9 @@ import (
// MLIntegrationManager manages ML optimization integration for the main WFS
type MLIntegrationManager struct {
- mlOptimization *ml.MLOptimization
+ mlOptimization *ml.MLOptimization
fuseIntegration *ml.FUSEMLIntegration
- enabled bool
+ enabled bool
}
// NewMLIntegrationManager creates a new ML integration manager
@@ -23,16 +21,16 @@ func NewMLIntegrationManager(chunkCache chunk_cache.ChunkCache, lookupFn wdclien
// Create ML optimization with default config
config := ml.DefaultMLConfig()
mlOpt := ml.NewMLOptimization(config, chunkCache, lookupFn)
-
+
// Create FUSE integration
fuseInt := ml.NewFUSEMLIntegration(mlOpt)
-
+
manager := &MLIntegrationManager{
mlOptimization: mlOpt,
fuseIntegration: fuseInt,
enabled: true,
}
-
+
glog.V(1).Infof("ML integration manager initialized")
return manager
}
@@ -40,15 +38,15 @@ func NewMLIntegrationManager(chunkCache chunk_cache.ChunkCache, lookupFn wdclien
// EnableMLOptimization enables or disables ML optimization
func (mgr *MLIntegrationManager) EnableMLOptimization(enabled bool) {
mgr.enabled = enabled
-
+
if mgr.mlOptimization != nil {
mgr.mlOptimization.Enable(enabled)
}
-
+
if mgr.fuseIntegration != nil {
mgr.fuseIntegration.EnableMLOptimizations(enabled)
}
-
+
glog.V(1).Infof("ML optimization %s", map[bool]string{true: "enabled", false: "disabled"}[enabled])
}
@@ -57,7 +55,7 @@ func (mgr *MLIntegrationManager) OnFileOpen(inode uint64, entry *filer_pb.Entry,
if !mgr.enabled || mgr.fuseIntegration == nil {
return
}
-
+
mgr.fuseIntegration.OnFileOpen(inode, entry, fullPath, flags, out)
}
@@ -66,7 +64,7 @@ func (mgr *MLIntegrationManager) OnFileClose(inode uint64) {
if !mgr.enabled || mgr.fuseIntegration == nil {
return
}
-
+
mgr.fuseIntegration.OnFileClose(inode)
}
@@ -75,7 +73,7 @@ func (mgr *MLIntegrationManager) OnFileRead(inode uint64, offset int64, size int
if !mgr.enabled || mgr.fuseIntegration == nil {
return
}
-
+
mgr.fuseIntegration.OnFileRead(inode, offset, size)
}
@@ -84,7 +82,7 @@ func (mgr *MLIntegrationManager) OnChunkAccess(inode uint64, chunkIndex uint32,
if !mgr.enabled || mgr.fuseIntegration == nil {
return
}
-
+
mgr.fuseIntegration.OnChunkAccess(inode, chunkIndex, fileId, cacheLevel, isHit)
}
@@ -93,7 +91,7 @@ func (mgr *MLIntegrationManager) OptimizeAttributes(inode uint64, out *fuse.Attr
if !mgr.enabled || mgr.fuseIntegration == nil {
return
}
-
+
mgr.fuseIntegration.OptimizeAttributes(inode, out)
}
@@ -102,7 +100,7 @@ func (mgr *MLIntegrationManager) OptimizeEntryCache(inode uint64, entry *filer_p
if !mgr.enabled || mgr.fuseIntegration == nil {
return
}
-
+
mgr.fuseIntegration.OptimizeEntryCache(inode, entry, out)
}
@@ -111,7 +109,7 @@ func (mgr *MLIntegrationManager) ShouldEnableWriteback(inode uint64, entry *file
if !mgr.enabled || mgr.fuseIntegration == nil {
return false
}
-
+
return mgr.fuseIntegration.ShouldEnableWriteback(inode, entry)
}
@@ -120,7 +118,7 @@ func (mgr *MLIntegrationManager) GetComprehensiveMetrics() *ml.FUSEMLMetrics {
if !mgr.enabled || mgr.fuseIntegration == nil {
return &ml.FUSEMLMetrics{}
}
-
+
metrics := mgr.fuseIntegration.GetOptimizationMetrics()
return &metrics
}
@@ -133,10 +131,10 @@ func (mgr *MLIntegrationManager) IsEnabled() bool {
// Shutdown gracefully shuts down the ML integration
func (mgr *MLIntegrationManager) Shutdown() {
glog.V(1).Infof("Shutting down ML integration manager...")
-
+
if mgr.fuseIntegration != nil {
mgr.fuseIntegration.Shutdown()
}
-
+
glog.V(1).Infof("ML integration manager shutdown complete")
}