diff options
Diffstat (limited to 'weed/mount/ml/batch_optimizer.go')
| -rw-r--r-- | weed/mount/ml/batch_optimizer.go | 478 |
1 files changed, 241 insertions, 237 deletions
diff --git a/weed/mount/ml/batch_optimizer.go b/weed/mount/ml/batch_optimizer.go index d5dbfa636..12ef75947 100644 --- a/weed/mount/ml/batch_optimizer.go +++ b/weed/mount/ml/batch_optimizer.go @@ -12,155 +12,155 @@ import ( type BatchAccessPattern int const ( - BatchPatternUnknown BatchAccessPattern = iota - BatchPatternLinear // Linear batch processing - BatchPatternStrided // Strided access with fixed gaps - BatchPatternShuffled // Randomized batch order - BatchPatternHierarchical // Hierarchical/nested batch access - BatchPatternMultiGPU // Multi-GPU distributed batches - BatchPatternPipelined // Pipelined batch processing + BatchPatternUnknown BatchAccessPattern = iota + BatchPatternLinear // Linear batch processing + BatchPatternStrided // Strided access with fixed gaps + BatchPatternShuffled // Randomized batch order + BatchPatternHierarchical // Hierarchical/nested batch access + BatchPatternMultiGPU // Multi-GPU distributed batches + BatchPatternPipelined // Pipelined batch processing ) // BatchAccess represents a single file access that's part of batch processing type BatchAccess struct { - Offset int64 // File offset - Size int // Access size - AccessTime time.Time // When accessed - IsRead bool // Whether this was a read operation - BatchHint string // Optional batch identifier hint + Offset int64 // File offset + Size int // Access size + AccessTime time.Time // When accessed + IsRead bool // Whether this was a read operation + BatchHint string // Optional batch identifier hint } // BatchInfo holds information about a detected batch type BatchInfo struct { sync.RWMutex - + // Batch identification - BatchID string // Unique batch identifier - StartOffset int64 // Starting file offset - EndOffset int64 // Ending file offset - Size int64 // Total batch size in bytes - ItemCount int // Number of items in batch - ItemSize int64 // Average item size - + BatchID string // Unique batch identifier + StartOffset int64 // Starting file offset + EndOffset int64 // Ending file offset + Size int64 // Total batch size in bytes + ItemCount int // Number of items in batch + ItemSize int64 // Average item size + // Access pattern - AccessPattern BatchAccessPattern // Detected access pattern - AccessOrder []int64 // Order of access within batch - AccessTimes []time.Time // When each item was accessed - ProcessingTime time.Duration // Total time to process batch - + AccessPattern BatchAccessPattern // Detected access pattern + AccessOrder []int64 // Order of access within batch + AccessTimes []time.Time // When each item was accessed + ProcessingTime time.Duration // Total time to process batch + // Performance metrics - LoadTime time.Duration // Time to load batch from storage - ProcessTime time.Duration // Time to process batch (compute) - TotalTime time.Duration // Total end-to-end time - Throughput float64 // Items per second - + LoadTime time.Duration // Time to load batch from storage + ProcessTime time.Duration // Time to process batch (compute) + TotalTime time.Duration // Total end-to-end time + Throughput float64 // Items per second + // Optimization state - IsPrefetched bool // Whether batch was prefetched - CacheHitRate float64 // Percentage of cache hits - OptimalPrefetch int64 // Recommended prefetch size - + IsPrefetched bool // Whether batch was prefetched + CacheHitRate float64 // Percentage of cache hits + OptimalPrefetch int64 // Recommended prefetch size + // Relationship to other batches - PreviousBatch *BatchInfo // Previous batch in sequence - NextBatch *BatchInfo // Next batch in sequence - ParentBatch *BatchInfo // Parent batch (for hierarchical) - ChildBatches []*BatchInfo // Child batches (for hierarchical) + PreviousBatch *BatchInfo // Previous batch in sequence + NextBatch *BatchInfo // Next batch in sequence + ParentBatch *BatchInfo // Parent batch (for hierarchical) + ChildBatches []*BatchInfo // Child batches (for hierarchical) } // BatchOptimizer optimizes batch access patterns for ML workloads type BatchOptimizer struct { sync.RWMutex - + // Configuration - maxBatchesTracked int // Maximum number of batches to track - batchDetectionWindow int // Window size for batch detection - minBatchSize int64 // Minimum size to consider as batch - maxBatchSize int64 // Maximum size to consider as batch - + maxBatchesTracked int // Maximum number of batches to track + batchDetectionWindow int // Window size for batch detection + minBatchSize int64 // Minimum size to consider as batch + maxBatchSize int64 // Maximum size to consider as batch + // Batch tracking - activeBatches map[string]*BatchInfo // Currently active batches - completedBatches map[string]*BatchInfo // Recently completed batches - inodeToBatches map[uint64][]*BatchInfo // File to batches mapping - + activeBatches map[string]*BatchInfo // Currently active batches + completedBatches map[string]*BatchInfo // Recently completed batches + inodeToBatches map[uint64][]*BatchInfo // File to batches mapping + // Pattern detection - accessHistory map[uint64][]BatchAccess // Recent access history per file - batchSequences map[uint64]*BatchSequence // Detected batch sequences - + accessHistory map[uint64][]BatchAccess // Recent access history per file + batchSequences map[uint64]*BatchSequence // Detected batch sequences + // Optimization strategies - prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern - cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern - + prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern + cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern + // Statistics - totalBatchesDetected int64 // Total batches detected - optimizationHits int64 // Successful optimization applications - optimizationMisses int64 // Failed optimization attempts - + totalBatchesDetected int64 // Total batches detected + optimizationHits int64 // Successful optimization applications + optimizationMisses int64 // Failed optimization attempts + // Background processing - cleanupTicker *time.Ticker // Cleanup timer - stopCleanup chan struct{} // Cleanup stop signal + cleanupTicker *time.Ticker // Cleanup timer + stopCleanup chan struct{} // Cleanup stop signal } // BatchSequence represents a sequence of related batches type BatchSequence struct { sync.RWMutex - - SequenceID string // Unique sequence identifier - Batches []*BatchInfo // Batches in sequence - Pattern BatchAccessPattern // Overall sequence pattern - StartTime time.Time // When sequence started - LastAccess time.Time // Last access in sequence - IsComplete bool // Whether sequence is complete - RepeatCount int // How many times sequence has repeated - + + SequenceID string // Unique sequence identifier + Batches []*BatchInfo // Batches in sequence + Pattern BatchAccessPattern // Overall sequence pattern + StartTime time.Time // When sequence started + LastAccess time.Time // Last access in sequence + IsComplete bool // Whether sequence is complete + RepeatCount int // How many times sequence has repeated + // Predictions - NextBatchOffset int64 // Predicted next batch offset - NextBatchSize int64 // Predicted next batch size - Confidence float64 // Confidence in predictions (0-1) + NextBatchOffset int64 // Predicted next batch offset + NextBatchSize int64 // Predicted next batch size + Confidence float64 // Confidence in predictions (0-1) } // PrefetchConfig holds configuration for prefetching strategies type PrefetchConfig struct { - Strategy PrefetchStrategy // Which prefetch strategy to use - LookaheadCount int // How many items to prefetch ahead - PrefetchSize int64 // Size to prefetch per operation - ConcurrencyLevel int // How many concurrent prefetch operations - AdaptiveScaling bool // Whether to scale based on performance + Strategy PrefetchStrategy // Which prefetch strategy to use + LookaheadCount int // How many items to prefetch ahead + PrefetchSize int64 // Size to prefetch per operation + ConcurrencyLevel int // How many concurrent prefetch operations + AdaptiveScaling bool // Whether to scale based on performance } -// CacheConfig holds configuration for caching strategies +// CacheConfig holds configuration for caching strategies type CacheConfig struct { - Policy CachePolicy // Which cache policy to use - RetentionTime time.Duration // How long to keep items cached - Priority CachePriority // Cache priority level - PreloadBatches int // How many batches to preload + Policy CachePolicy // Which cache policy to use + RetentionTime time.Duration // How long to keep items cached + Priority CachePriority // Cache priority level + PreloadBatches int // How many batches to preload } // NewBatchOptimizer creates a new batch optimizer func NewBatchOptimizer() *BatchOptimizer { bo := &BatchOptimizer{ - maxBatchesTracked: 1000, // Track up to 1000 batches - batchDetectionWindow: 100, // Look at last 100 accesses - minBatchSize: 64 * 1024, // Minimum 64KB batch - maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch - - activeBatches: make(map[string]*BatchInfo), - completedBatches: make(map[string]*BatchInfo), - inodeToBatches: make(map[uint64][]*BatchInfo), - accessHistory: make(map[uint64][]BatchAccess), - batchSequences: make(map[uint64]*BatchSequence), - - prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig), - cacheStrategies: make(map[BatchAccessPattern]*CacheConfig), - - stopCleanup: make(chan struct{}), - } - + maxBatchesTracked: 1000, // Track up to 1000 batches + batchDetectionWindow: 100, // Look at last 100 accesses + minBatchSize: 64 * 1024, // Minimum 64KB batch + maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch + + activeBatches: make(map[string]*BatchInfo), + completedBatches: make(map[string]*BatchInfo), + inodeToBatches: make(map[uint64][]*BatchInfo), + accessHistory: make(map[uint64][]BatchAccess), + batchSequences: make(map[uint64]*BatchSequence), + + prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig), + cacheStrategies: make(map[BatchAccessPattern]*CacheConfig), + + stopCleanup: make(chan struct{}), + } + // Initialize default strategies bo.initializeDefaultStrategies() - + // Start cleanup routine bo.cleanupTicker = time.NewTicker(5 * time.Minute) go bo.cleanupRoutine() - + glog.V(1).Infof("Batch optimizer initialized") return bo } @@ -169,11 +169,11 @@ func NewBatchOptimizer() *BatchOptimizer { func (bo *BatchOptimizer) initializeDefaultStrategies() { // Linear batch pattern - aggressive prefetching bo.prefetchStrategies[BatchPatternLinear] = &PrefetchConfig{ - Strategy: PrefetchAggressive, - LookaheadCount: 5, - PrefetchSize: 2 * 1024 * 1024, // 2MB + Strategy: PrefetchAggressive, + LookaheadCount: 5, + PrefetchSize: 2 * 1024 * 1024, // 2MB ConcurrencyLevel: 3, - AdaptiveScaling: true, + AdaptiveScaling: true, } bo.cacheStrategies[BatchPatternLinear] = &CacheConfig{ Policy: CachePolicyTrainingAware, @@ -181,14 +181,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() { Priority: CachePriorityHigh, PreloadBatches: 2, } - + // Shuffled batch pattern - conservative prefetching bo.prefetchStrategies[BatchPatternShuffled] = &PrefetchConfig{ - Strategy: PrefetchBalanced, - LookaheadCount: 2, - PrefetchSize: 512 * 1024, // 512KB + Strategy: PrefetchBalanced, + LookaheadCount: 2, + PrefetchSize: 512 * 1024, // 512KB ConcurrencyLevel: 2, - AdaptiveScaling: true, + AdaptiveScaling: true, } bo.cacheStrategies[BatchPatternShuffled] = &CacheConfig{ Policy: CachePolicyLRU, @@ -196,14 +196,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() { Priority: CachePriorityNormal, PreloadBatches: 1, } - + // Multi-GPU pattern - high concurrency bo.prefetchStrategies[BatchPatternMultiGPU] = &PrefetchConfig{ - Strategy: PrefetchAggressive, - LookaheadCount: 8, - PrefetchSize: 4 * 1024 * 1024, // 4MB + Strategy: PrefetchAggressive, + LookaheadCount: 8, + PrefetchSize: 4 * 1024 * 1024, // 4MB ConcurrencyLevel: 6, - AdaptiveScaling: true, + AdaptiveScaling: true, } bo.cacheStrategies[BatchPatternMultiGPU] = &CacheConfig{ Policy: CachePolicyML, @@ -217,7 +217,7 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() { func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int, isRead bool, batchHint string) *BatchInfo { bo.Lock() defer bo.Unlock() - + access := BatchAccess{ Offset: offset, Size: size, @@ -225,7 +225,7 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int IsRead: isRead, BatchHint: batchHint, } - + // Add to access history history := bo.accessHistory[inode] history = append(history, access) @@ -233,23 +233,23 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int history = history[1:] // Keep only recent accesses } bo.accessHistory[inode] = history - + // Detect batch patterns batchInfo := bo.detectBatchPattern(inode, history) if batchInfo != nil { bo.totalBatchesDetected++ - + // Add to tracking bo.activeBatches[batchInfo.BatchID] = batchInfo bo.inodeToBatches[inode] = append(bo.inodeToBatches[inode], batchInfo) - + // Update batch sequence bo.updateBatchSequence(inode, batchInfo) - - glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d", + + glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d", inode, batchInfo.AccessPattern, batchInfo.Size, batchInfo.ItemCount) } - + return batchInfo } @@ -258,25 +258,29 @@ func (bo *BatchOptimizer) detectBatchPattern(inode uint64, history []BatchAccess if len(history) < 3 { return nil // Need minimum history } - + // Look for batch boundaries by analyzing access gaps and patterns - recent := history[len(history)-10:] // Look at last 10 accesses + startIdx := len(history) - 10 + if startIdx < 0 { + startIdx = 0 + } + recent := history[startIdx:] // Look at last 10 accesses (or all if fewer) if len(recent) < 3 { recent = history } - + // Check for batch characteristics batchInfo := bo.analyzePotentialBatch(recent, inode) if batchInfo == nil { return nil } - + // Determine access pattern batchInfo.AccessPattern = bo.classifyBatchPattern(batchInfo, recent) - + // Calculate performance metrics bo.calculateBatchMetrics(batchInfo, recent) - + return batchInfo } @@ -285,60 +289,60 @@ func (bo *BatchOptimizer) analyzePotentialBatch(accesses []BatchAccess, inode ui if len(accesses) < 2 { return nil } - + // Calculate basic statistics var totalSize int64 var itemCount int minOffset := accesses[0].Offset maxOffset := accesses[0].Offset - + accessOrder := make([]int64, len(accesses)) accessTimes := make([]time.Time, len(accesses)) - + for i, access := range accesses { totalSize += int64(access.Size) itemCount++ - + if access.Offset < minOffset { minOffset = access.Offset } if access.Offset > maxOffset { maxOffset = access.Offset } - + accessOrder[i] = access.Offset accessTimes[i] = access.AccessTime } - + batchSize := maxOffset - minOffset + int64(accesses[len(accesses)-1].Size) - + // Check if this qualifies as a batch if batchSize < bo.minBatchSize || batchSize > bo.maxBatchSize { return nil } - + // Check temporal locality (accesses should be close in time) timeSpan := accessTimes[len(accessTimes)-1].Sub(accessTimes[0]) if timeSpan > 10*time.Minute { // Too spread out in time return nil } - + // Create batch info batchID := generateBatchID(inode, minOffset, time.Now()) - + batchInfo := &BatchInfo{ - BatchID: batchID, - StartOffset: minOffset, - EndOffset: maxOffset, - Size: batchSize, - ItemCount: itemCount, - ItemSize: totalSize / int64(itemCount), - AccessOrder: accessOrder, - AccessTimes: accessTimes, - TotalTime: timeSpan, - LoadTime: timeSpan, // Initially assume all time is load time - } - + BatchID: batchID, + StartOffset: minOffset, + EndOffset: maxOffset, + Size: batchSize, + ItemCount: itemCount, + ItemSize: totalSize / int64(itemCount), + AccessOrder: accessOrder, + AccessTimes: accessTimes, + TotalTime: timeSpan, + LoadTime: timeSpan, // Initially assume all time is load time + } + return batchInfo } @@ -347,7 +351,7 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc if len(batch.AccessOrder) < 2 { return BatchPatternUnknown } - + // Check for linear pattern (sequential offsets) isLinear := true for i := 1; i < len(batch.AccessOrder); i++ { @@ -356,31 +360,31 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc break } } - + if isLinear { return BatchPatternLinear } - + // Check for strided pattern (regular gaps) if bo.isStridedPattern(batch.AccessOrder) { return BatchPatternStrided } - + // Check for shuffled pattern (randomized order) if bo.isShuffledPattern(batch.AccessOrder) { return BatchPatternShuffled } - + // Check for multi-GPU pattern (parallel access indicators) if bo.isMultiGPUPattern(accesses) { return BatchPatternMultiGPU } - + // Check for pipelined pattern (overlapping accesses) if bo.isPipelinedPattern(batch.AccessTimes) { return BatchPatternPipelined } - + return BatchPatternUnknown } @@ -389,13 +393,13 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool { if len(offsets) < 3 { return false } - + // Calculate stride stride := offsets[1] - offsets[0] if stride <= 0 { return false } - + // Check if all accesses follow the same stride consistentStrides := 0 for i := 2; i < len(offsets); i++ { @@ -404,9 +408,9 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool { consistentStrides++ } } - + // At least 80% of strides should be consistent - return float64(consistentStrides) / float64(len(offsets)-2) >= 0.8 + return float64(consistentStrides)/float64(len(offsets)-2) >= 0.8 } // isShuffledPattern checks if accesses are in randomized order @@ -414,7 +418,7 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool { if len(offsets) < 5 { return false } - + // Count inversions (out-of-order pairs) inversions := 0 for i := 0; i < len(offsets); i++ { @@ -424,10 +428,10 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool { } } } - + totalPairs := len(offsets) * (len(offsets) - 1) / 2 inversionRate := float64(inversions) / float64(totalPairs) - + // High inversion rate suggests shuffling return inversionRate > 0.3 } @@ -437,22 +441,22 @@ func (bo *BatchOptimizer) isMultiGPUPattern(accesses []BatchAccess) bool { // Look for multiple concurrent access streams // This is a simplified heuristic - in practice, this would need more // sophisticated detection based on process info, etc. - + if len(accesses) < 4 { return false } - + // Check for concurrent accesses (multiple accesses in very short time) concurrentWindows := 0 windowSize := 100 * time.Millisecond - + for i := 0; i < len(accesses)-1; i++ { timeDiff := accesses[i+1].AccessTime.Sub(accesses[i].AccessTime) if timeDiff < windowSize { concurrentWindows++ } } - + // If many accesses are concurrent, might be multi-GPU return float64(concurrentWindows)/float64(len(accesses)) > 0.5 } @@ -462,30 +466,30 @@ func (bo *BatchOptimizer) isPipelinedPattern(accessTimes []time.Time) bool { if len(accessTimes) < 3 { return false } - + // Look for regular, overlapping timing patterns intervals := make([]time.Duration, len(accessTimes)-1) for i := 1; i < len(accessTimes); i++ { intervals[i-1] = accessTimes[i].Sub(accessTimes[i-1]) } - + // Calculate coefficient of variation for intervals var sum, sumSq time.Duration for _, interval := range intervals { sum += interval sumSq += interval * interval } - + n := time.Duration(len(intervals)) mean := sum / n if mean == 0 { return false } - + // Calculate variance and CV variance := (sumSq / n) - (mean * mean) - cv := float64(variance) / float64(mean * mean) - + cv := float64(variance) / float64(mean*mean) + // Low coefficient of variation suggests regular pipelining return cv < 0.2 } @@ -495,18 +499,18 @@ func (bo *BatchOptimizer) calculateBatchMetrics(batch *BatchInfo, accesses []Bat if len(batch.AccessTimes) < 2 { return } - + // Calculate throughput timeSpan := batch.AccessTimes[len(batch.AccessTimes)-1].Sub(batch.AccessTimes[0]) if timeSpan > 0 { batch.Throughput = float64(batch.ItemCount) / timeSpan.Seconds() } - + // Estimate processing vs load time (heuristic) // In practice, this would need more sophisticated measurement avgItemTime := timeSpan / time.Duration(batch.ItemCount) - batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time - batch.LoadTime = avgItemTime / 2 // Assume 50% load time + batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time + batch.LoadTime = avgItemTime / 2 // Assume 50% load time } // updateBatchSequence updates the batch sequence for an inode @@ -521,26 +525,26 @@ func (bo *BatchOptimizer) updateBatchSequence(inode uint64, newBatch *BatchInfo) } bo.batchSequences[inode] = sequence } - + sequence.Lock() defer sequence.Unlock() - + // Link batches if len(sequence.Batches) > 0 { lastBatch := sequence.Batches[len(sequence.Batches)-1] lastBatch.NextBatch = newBatch newBatch.PreviousBatch = lastBatch } - + sequence.Batches = append(sequence.Batches, newBatch) sequence.LastAccess = time.Now() - + // Update sequence pattern based on majority of batches bo.updateSequencePattern(sequence) - + // Make predictions for next batch bo.updateSequencePredictions(sequence) - + // Keep sequence size manageable if len(sequence.Batches) > 100 { sequence.Batches = sequence.Batches[len(sequence.Batches)-50:] // Keep last 50 batches @@ -552,13 +556,13 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) { if len(sequence.Batches) < 3 { return } - + // Count patterns patternCounts := make(map[BatchAccessPattern]int) for _, batch := range sequence.Batches { patternCounts[batch.AccessPattern]++ } - + // Find most common pattern maxCount := 0 var dominantPattern BatchAccessPattern @@ -568,7 +572,7 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) { dominantPattern = pattern } } - + sequence.Pattern = dominantPattern } @@ -577,12 +581,12 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { if len(sequence.Batches) < 2 { return } - + recent := sequence.Batches[len(sequence.Batches)-3:] // Last 3 batches if len(recent) < 2 { recent = sequence.Batches } - + // Predict next batch offset based on pattern switch sequence.Pattern { case BatchPatternLinear: @@ -595,7 +599,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { sequence.NextBatchSize = lastBatch.Size sequence.Confidence = 0.8 } - + case BatchPatternStrided: // Regular stride if len(recent) >= 3 { @@ -604,7 +608,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { sequence.NextBatchSize = recent[len(recent)-1].Size sequence.Confidence = 0.7 } - + default: // Lower confidence for unpredictable patterns sequence.Confidence = 0.3 @@ -615,60 +619,60 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) { func (bo *BatchOptimizer) GetBatchRecommendations(inode uint64) *BatchOptimizationRecommendations { bo.RLock() defer bo.RUnlock() - + sequence := bo.batchSequences[inode] if sequence == nil { return &BatchOptimizationRecommendations{ ShouldOptimize: false, } } - + sequence.RLock() defer sequence.RUnlock() - + prefetchConfig := bo.prefetchStrategies[sequence.Pattern] cacheConfig := bo.cacheStrategies[sequence.Pattern] - + if prefetchConfig == nil { prefetchConfig = bo.prefetchStrategies[BatchPatternUnknown] } if cacheConfig == nil { cacheConfig = bo.cacheStrategies[BatchPatternUnknown] } - + recommendations := &BatchOptimizationRecommendations{ - ShouldOptimize: true, - Pattern: sequence.Pattern, - PrefetchSize: prefetchConfig.PrefetchSize, - PrefetchCount: prefetchConfig.LookaheadCount, - CachePriority: cacheConfig.Priority, - CacheRetention: cacheConfig.RetentionTime, - NextBatchOffset: sequence.NextBatchOffset, - NextBatchSize: sequence.NextBatchSize, - Confidence: sequence.Confidence, - } - + ShouldOptimize: true, + Pattern: sequence.Pattern, + PrefetchSize: prefetchConfig.PrefetchSize, + PrefetchCount: prefetchConfig.LookaheadCount, + CachePriority: cacheConfig.Priority, + CacheRetention: cacheConfig.RetentionTime, + NextBatchOffset: sequence.NextBatchOffset, + NextBatchSize: sequence.NextBatchSize, + Confidence: sequence.Confidence, + } + return recommendations } // BatchOptimizationRecommendations holds batch optimization recommendations type BatchOptimizationRecommendations struct { - ShouldOptimize bool `json:"should_optimize"` - Pattern BatchAccessPattern `json:"pattern"` - PrefetchSize int64 `json:"prefetch_size"` - PrefetchCount int `json:"prefetch_count"` - CachePriority CachePriority `json:"cache_priority"` - CacheRetention time.Duration `json:"cache_retention"` - NextBatchOffset int64 `json:"next_batch_offset"` - NextBatchSize int64 `json:"next_batch_size"` - Confidence float64 `json:"confidence"` + ShouldOptimize bool `json:"should_optimize"` + Pattern BatchAccessPattern `json:"pattern"` + PrefetchSize int64 `json:"prefetch_size"` + PrefetchCount int `json:"prefetch_count"` + CachePriority CachePriority `json:"cache_priority"` + CacheRetention time.Duration `json:"cache_retention"` + NextBatchOffset int64 `json:"next_batch_offset"` + NextBatchSize int64 `json:"next_batch_size"` + Confidence float64 `json:"confidence"` } // GetBatchMetrics returns comprehensive batch optimization metrics func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics { bo.RLock() defer bo.RUnlock() - + metrics := BatchOptimizerMetrics{ TotalBatchesDetected: bo.totalBatchesDetected, ActiveBatches: int64(len(bo.activeBatches)), @@ -677,32 +681,32 @@ func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics { OptimizationMisses: bo.optimizationMisses, PatternCounts: make(map[BatchAccessPattern]int64), } - + // Count patterns for _, batch := range bo.activeBatches { batch.RLock() metrics.PatternCounts[batch.AccessPattern]++ batch.RUnlock() } - + // Calculate hit rate totalAttempts := bo.optimizationHits + bo.optimizationMisses if totalAttempts > 0 { metrics.OptimizationHitRate = float64(bo.optimizationHits) / float64(totalAttempts) } - + return metrics } // BatchOptimizerMetrics holds metrics for batch optimization type BatchOptimizerMetrics struct { - TotalBatchesDetected int64 `json:"total_batches_detected"` - ActiveBatches int64 `json:"active_batches"` - CompletedBatches int64 `json:"completed_batches"` - OptimizationHits int64 `json:"optimization_hits"` - OptimizationMisses int64 `json:"optimization_misses"` - OptimizationHitRate float64 `json:"optimization_hit_rate"` - PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"` + TotalBatchesDetected int64 `json:"total_batches_detected"` + ActiveBatches int64 `json:"active_batches"` + CompletedBatches int64 `json:"completed_batches"` + OptimizationHits int64 `json:"optimization_hits"` + OptimizationMisses int64 `json:"optimization_misses"` + OptimizationHitRate float64 `json:"optimization_hit_rate"` + PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"` } // cleanupRoutine performs periodic cleanup of old batch information @@ -721,21 +725,21 @@ func (bo *BatchOptimizer) cleanupRoutine() { func (bo *BatchOptimizer) performCleanup() { bo.Lock() defer bo.Unlock() - + now := time.Now() cutoff := now.Add(-30 * time.Minute) // Remove batches older than 30 minutes - + // Clean up completed batches for id, batch := range bo.completedBatches { batch.RLock() shouldRemove := len(batch.AccessTimes) > 0 && batch.AccessTimes[0].Before(cutoff) batch.RUnlock() - + if shouldRemove { delete(bo.completedBatches, id) } } - + // Clean up access history for inode, history := range bo.accessHistory { filtered := make([]BatchAccess, 0, len(history)) @@ -744,14 +748,14 @@ func (bo *BatchOptimizer) performCleanup() { filtered = append(filtered, access) } } - + if len(filtered) == 0 { delete(bo.accessHistory, inode) } else { bo.accessHistory[inode] = filtered } } - + // Clean up batch sequences for inode, sequence := range bo.batchSequences { sequence.Lock() @@ -762,7 +766,7 @@ func (bo *BatchOptimizer) performCleanup() { } sequence.Unlock() } - + glog.V(4).Infof("Batch optimizer cleanup completed") } @@ -771,9 +775,9 @@ func (bo *BatchOptimizer) Shutdown() { if bo.cleanupTicker != nil { bo.cleanupTicker.Stop() } - + close(bo.stopCleanup) - + glog.V(1).Infof("Batch optimizer shutdown complete") } |
