aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/ml/batch_optimizer.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-08-30 17:08:02 -0700
committerchrislu <chris.lu@gmail.com>2025-08-30 17:08:02 -0700
commit5fe7f3fef24c3e2645351eae6727c4273ecfa191 (patch)
treec7351578e657419f8d546fd1cf813afb0ba83463 /weed/mount/ml/batch_optimizer.go
parent814e0bb233117ac8e4a101b68418c2109000d994 (diff)
downloadseaweedfs-origin/improve-fuse-mount.tar.xz
seaweedfs-origin/improve-fuse-mount.zip
Diffstat (limited to 'weed/mount/ml/batch_optimizer.go')
-rw-r--r--weed/mount/ml/batch_optimizer.go478
1 files changed, 241 insertions, 237 deletions
diff --git a/weed/mount/ml/batch_optimizer.go b/weed/mount/ml/batch_optimizer.go
index d5dbfa636..12ef75947 100644
--- a/weed/mount/ml/batch_optimizer.go
+++ b/weed/mount/ml/batch_optimizer.go
@@ -12,155 +12,155 @@ import (
type BatchAccessPattern int
const (
- BatchPatternUnknown BatchAccessPattern = iota
- BatchPatternLinear // Linear batch processing
- BatchPatternStrided // Strided access with fixed gaps
- BatchPatternShuffled // Randomized batch order
- BatchPatternHierarchical // Hierarchical/nested batch access
- BatchPatternMultiGPU // Multi-GPU distributed batches
- BatchPatternPipelined // Pipelined batch processing
+ BatchPatternUnknown BatchAccessPattern = iota
+ BatchPatternLinear // Linear batch processing
+ BatchPatternStrided // Strided access with fixed gaps
+ BatchPatternShuffled // Randomized batch order
+ BatchPatternHierarchical // Hierarchical/nested batch access
+ BatchPatternMultiGPU // Multi-GPU distributed batches
+ BatchPatternPipelined // Pipelined batch processing
)
// BatchAccess represents a single file access that's part of batch processing
type BatchAccess struct {
- Offset int64 // File offset
- Size int // Access size
- AccessTime time.Time // When accessed
- IsRead bool // Whether this was a read operation
- BatchHint string // Optional batch identifier hint
+ Offset int64 // File offset
+ Size int // Access size
+ AccessTime time.Time // When accessed
+ IsRead bool // Whether this was a read operation
+ BatchHint string // Optional batch identifier hint
}
// BatchInfo holds information about a detected batch
type BatchInfo struct {
sync.RWMutex
-
+
// Batch identification
- BatchID string // Unique batch identifier
- StartOffset int64 // Starting file offset
- EndOffset int64 // Ending file offset
- Size int64 // Total batch size in bytes
- ItemCount int // Number of items in batch
- ItemSize int64 // Average item size
-
+ BatchID string // Unique batch identifier
+ StartOffset int64 // Starting file offset
+ EndOffset int64 // Ending file offset
+ Size int64 // Total batch size in bytes
+ ItemCount int // Number of items in batch
+ ItemSize int64 // Average item size
+
// Access pattern
- AccessPattern BatchAccessPattern // Detected access pattern
- AccessOrder []int64 // Order of access within batch
- AccessTimes []time.Time // When each item was accessed
- ProcessingTime time.Duration // Total time to process batch
-
+ AccessPattern BatchAccessPattern // Detected access pattern
+ AccessOrder []int64 // Order of access within batch
+ AccessTimes []time.Time // When each item was accessed
+ ProcessingTime time.Duration // Total time to process batch
+
// Performance metrics
- LoadTime time.Duration // Time to load batch from storage
- ProcessTime time.Duration // Time to process batch (compute)
- TotalTime time.Duration // Total end-to-end time
- Throughput float64 // Items per second
-
+ LoadTime time.Duration // Time to load batch from storage
+ ProcessTime time.Duration // Time to process batch (compute)
+ TotalTime time.Duration // Total end-to-end time
+ Throughput float64 // Items per second
+
// Optimization state
- IsPrefetched bool // Whether batch was prefetched
- CacheHitRate float64 // Percentage of cache hits
- OptimalPrefetch int64 // Recommended prefetch size
-
+ IsPrefetched bool // Whether batch was prefetched
+ CacheHitRate float64 // Percentage of cache hits
+ OptimalPrefetch int64 // Recommended prefetch size
+
// Relationship to other batches
- PreviousBatch *BatchInfo // Previous batch in sequence
- NextBatch *BatchInfo // Next batch in sequence
- ParentBatch *BatchInfo // Parent batch (for hierarchical)
- ChildBatches []*BatchInfo // Child batches (for hierarchical)
+ PreviousBatch *BatchInfo // Previous batch in sequence
+ NextBatch *BatchInfo // Next batch in sequence
+ ParentBatch *BatchInfo // Parent batch (for hierarchical)
+ ChildBatches []*BatchInfo // Child batches (for hierarchical)
}
// BatchOptimizer optimizes batch access patterns for ML workloads
type BatchOptimizer struct {
sync.RWMutex
-
+
// Configuration
- maxBatchesTracked int // Maximum number of batches to track
- batchDetectionWindow int // Window size for batch detection
- minBatchSize int64 // Minimum size to consider as batch
- maxBatchSize int64 // Maximum size to consider as batch
-
+ maxBatchesTracked int // Maximum number of batches to track
+ batchDetectionWindow int // Window size for batch detection
+ minBatchSize int64 // Minimum size to consider as batch
+ maxBatchSize int64 // Maximum size to consider as batch
+
// Batch tracking
- activeBatches map[string]*BatchInfo // Currently active batches
- completedBatches map[string]*BatchInfo // Recently completed batches
- inodeToBatches map[uint64][]*BatchInfo // File to batches mapping
-
+ activeBatches map[string]*BatchInfo // Currently active batches
+ completedBatches map[string]*BatchInfo // Recently completed batches
+ inodeToBatches map[uint64][]*BatchInfo // File to batches mapping
+
// Pattern detection
- accessHistory map[uint64][]BatchAccess // Recent access history per file
- batchSequences map[uint64]*BatchSequence // Detected batch sequences
-
+ accessHistory map[uint64][]BatchAccess // Recent access history per file
+ batchSequences map[uint64]*BatchSequence // Detected batch sequences
+
// Optimization strategies
- prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern
- cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern
-
+ prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern
+ cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern
+
// Statistics
- totalBatchesDetected int64 // Total batches detected
- optimizationHits int64 // Successful optimization applications
- optimizationMisses int64 // Failed optimization attempts
-
+ totalBatchesDetected int64 // Total batches detected
+ optimizationHits int64 // Successful optimization applications
+ optimizationMisses int64 // Failed optimization attempts
+
// Background processing
- cleanupTicker *time.Ticker // Cleanup timer
- stopCleanup chan struct{} // Cleanup stop signal
+ cleanupTicker *time.Ticker // Cleanup timer
+ stopCleanup chan struct{} // Cleanup stop signal
}
// BatchSequence represents a sequence of related batches
type BatchSequence struct {
sync.RWMutex
-
- SequenceID string // Unique sequence identifier
- Batches []*BatchInfo // Batches in sequence
- Pattern BatchAccessPattern // Overall sequence pattern
- StartTime time.Time // When sequence started
- LastAccess time.Time // Last access in sequence
- IsComplete bool // Whether sequence is complete
- RepeatCount int // How many times sequence has repeated
-
+
+ SequenceID string // Unique sequence identifier
+ Batches []*BatchInfo // Batches in sequence
+ Pattern BatchAccessPattern // Overall sequence pattern
+ StartTime time.Time // When sequence started
+ LastAccess time.Time // Last access in sequence
+ IsComplete bool // Whether sequence is complete
+ RepeatCount int // How many times sequence has repeated
+
// Predictions
- NextBatchOffset int64 // Predicted next batch offset
- NextBatchSize int64 // Predicted next batch size
- Confidence float64 // Confidence in predictions (0-1)
+ NextBatchOffset int64 // Predicted next batch offset
+ NextBatchSize int64 // Predicted next batch size
+ Confidence float64 // Confidence in predictions (0-1)
}
// PrefetchConfig holds configuration for prefetching strategies
type PrefetchConfig struct {
- Strategy PrefetchStrategy // Which prefetch strategy to use
- LookaheadCount int // How many items to prefetch ahead
- PrefetchSize int64 // Size to prefetch per operation
- ConcurrencyLevel int // How many concurrent prefetch operations
- AdaptiveScaling bool // Whether to scale based on performance
+ Strategy PrefetchStrategy // Which prefetch strategy to use
+ LookaheadCount int // How many items to prefetch ahead
+ PrefetchSize int64 // Size to prefetch per operation
+ ConcurrencyLevel int // How many concurrent prefetch operations
+ AdaptiveScaling bool // Whether to scale based on performance
}
-// CacheConfig holds configuration for caching strategies
+// CacheConfig holds configuration for caching strategies
type CacheConfig struct {
- Policy CachePolicy // Which cache policy to use
- RetentionTime time.Duration // How long to keep items cached
- Priority CachePriority // Cache priority level
- PreloadBatches int // How many batches to preload
+ Policy CachePolicy // Which cache policy to use
+ RetentionTime time.Duration // How long to keep items cached
+ Priority CachePriority // Cache priority level
+ PreloadBatches int // How many batches to preload
}
// NewBatchOptimizer creates a new batch optimizer
func NewBatchOptimizer() *BatchOptimizer {
bo := &BatchOptimizer{
- maxBatchesTracked: 1000, // Track up to 1000 batches
- batchDetectionWindow: 100, // Look at last 100 accesses
- minBatchSize: 64 * 1024, // Minimum 64KB batch
- maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch
-
- activeBatches: make(map[string]*BatchInfo),
- completedBatches: make(map[string]*BatchInfo),
- inodeToBatches: make(map[uint64][]*BatchInfo),
- accessHistory: make(map[uint64][]BatchAccess),
- batchSequences: make(map[uint64]*BatchSequence),
-
- prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig),
- cacheStrategies: make(map[BatchAccessPattern]*CacheConfig),
-
- stopCleanup: make(chan struct{}),
- }
-
+ maxBatchesTracked: 1000, // Track up to 1000 batches
+ batchDetectionWindow: 100, // Look at last 100 accesses
+ minBatchSize: 64 * 1024, // Minimum 64KB batch
+ maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch
+
+ activeBatches: make(map[string]*BatchInfo),
+ completedBatches: make(map[string]*BatchInfo),
+ inodeToBatches: make(map[uint64][]*BatchInfo),
+ accessHistory: make(map[uint64][]BatchAccess),
+ batchSequences: make(map[uint64]*BatchSequence),
+
+ prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig),
+ cacheStrategies: make(map[BatchAccessPattern]*CacheConfig),
+
+ stopCleanup: make(chan struct{}),
+ }
+
// Initialize default strategies
bo.initializeDefaultStrategies()
-
+
// Start cleanup routine
bo.cleanupTicker = time.NewTicker(5 * time.Minute)
go bo.cleanupRoutine()
-
+
glog.V(1).Infof("Batch optimizer initialized")
return bo
}
@@ -169,11 +169,11 @@ func NewBatchOptimizer() *BatchOptimizer {
func (bo *BatchOptimizer) initializeDefaultStrategies() {
// Linear batch pattern - aggressive prefetching
bo.prefetchStrategies[BatchPatternLinear] = &PrefetchConfig{
- Strategy: PrefetchAggressive,
- LookaheadCount: 5,
- PrefetchSize: 2 * 1024 * 1024, // 2MB
+ Strategy: PrefetchAggressive,
+ LookaheadCount: 5,
+ PrefetchSize: 2 * 1024 * 1024, // 2MB
ConcurrencyLevel: 3,
- AdaptiveScaling: true,
+ AdaptiveScaling: true,
}
bo.cacheStrategies[BatchPatternLinear] = &CacheConfig{
Policy: CachePolicyTrainingAware,
@@ -181,14 +181,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() {
Priority: CachePriorityHigh,
PreloadBatches: 2,
}
-
+
// Shuffled batch pattern - conservative prefetching
bo.prefetchStrategies[BatchPatternShuffled] = &PrefetchConfig{
- Strategy: PrefetchBalanced,
- LookaheadCount: 2,
- PrefetchSize: 512 * 1024, // 512KB
+ Strategy: PrefetchBalanced,
+ LookaheadCount: 2,
+ PrefetchSize: 512 * 1024, // 512KB
ConcurrencyLevel: 2,
- AdaptiveScaling: true,
+ AdaptiveScaling: true,
}
bo.cacheStrategies[BatchPatternShuffled] = &CacheConfig{
Policy: CachePolicyLRU,
@@ -196,14 +196,14 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() {
Priority: CachePriorityNormal,
PreloadBatches: 1,
}
-
+
// Multi-GPU pattern - high concurrency
bo.prefetchStrategies[BatchPatternMultiGPU] = &PrefetchConfig{
- Strategy: PrefetchAggressive,
- LookaheadCount: 8,
- PrefetchSize: 4 * 1024 * 1024, // 4MB
+ Strategy: PrefetchAggressive,
+ LookaheadCount: 8,
+ PrefetchSize: 4 * 1024 * 1024, // 4MB
ConcurrencyLevel: 6,
- AdaptiveScaling: true,
+ AdaptiveScaling: true,
}
bo.cacheStrategies[BatchPatternMultiGPU] = &CacheConfig{
Policy: CachePolicyML,
@@ -217,7 +217,7 @@ func (bo *BatchOptimizer) initializeDefaultStrategies() {
func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int, isRead bool, batchHint string) *BatchInfo {
bo.Lock()
defer bo.Unlock()
-
+
access := BatchAccess{
Offset: offset,
Size: size,
@@ -225,7 +225,7 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int
IsRead: isRead,
BatchHint: batchHint,
}
-
+
// Add to access history
history := bo.accessHistory[inode]
history = append(history, access)
@@ -233,23 +233,23 @@ func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int
history = history[1:] // Keep only recent accesses
}
bo.accessHistory[inode] = history
-
+
// Detect batch patterns
batchInfo := bo.detectBatchPattern(inode, history)
if batchInfo != nil {
bo.totalBatchesDetected++
-
+
// Add to tracking
bo.activeBatches[batchInfo.BatchID] = batchInfo
bo.inodeToBatches[inode] = append(bo.inodeToBatches[inode], batchInfo)
-
+
// Update batch sequence
bo.updateBatchSequence(inode, batchInfo)
-
- glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d",
+
+ glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d",
inode, batchInfo.AccessPattern, batchInfo.Size, batchInfo.ItemCount)
}
-
+
return batchInfo
}
@@ -258,25 +258,29 @@ func (bo *BatchOptimizer) detectBatchPattern(inode uint64, history []BatchAccess
if len(history) < 3 {
return nil // Need minimum history
}
-
+
// Look for batch boundaries by analyzing access gaps and patterns
- recent := history[len(history)-10:] // Look at last 10 accesses
+ startIdx := len(history) - 10
+ if startIdx < 0 {
+ startIdx = 0
+ }
+ recent := history[startIdx:] // Look at last 10 accesses (or all if fewer)
if len(recent) < 3 {
recent = history
}
-
+
// Check for batch characteristics
batchInfo := bo.analyzePotentialBatch(recent, inode)
if batchInfo == nil {
return nil
}
-
+
// Determine access pattern
batchInfo.AccessPattern = bo.classifyBatchPattern(batchInfo, recent)
-
+
// Calculate performance metrics
bo.calculateBatchMetrics(batchInfo, recent)
-
+
return batchInfo
}
@@ -285,60 +289,60 @@ func (bo *BatchOptimizer) analyzePotentialBatch(accesses []BatchAccess, inode ui
if len(accesses) < 2 {
return nil
}
-
+
// Calculate basic statistics
var totalSize int64
var itemCount int
minOffset := accesses[0].Offset
maxOffset := accesses[0].Offset
-
+
accessOrder := make([]int64, len(accesses))
accessTimes := make([]time.Time, len(accesses))
-
+
for i, access := range accesses {
totalSize += int64(access.Size)
itemCount++
-
+
if access.Offset < minOffset {
minOffset = access.Offset
}
if access.Offset > maxOffset {
maxOffset = access.Offset
}
-
+
accessOrder[i] = access.Offset
accessTimes[i] = access.AccessTime
}
-
+
batchSize := maxOffset - minOffset + int64(accesses[len(accesses)-1].Size)
-
+
// Check if this qualifies as a batch
if batchSize < bo.minBatchSize || batchSize > bo.maxBatchSize {
return nil
}
-
+
// Check temporal locality (accesses should be close in time)
timeSpan := accessTimes[len(accessTimes)-1].Sub(accessTimes[0])
if timeSpan > 10*time.Minute { // Too spread out in time
return nil
}
-
+
// Create batch info
batchID := generateBatchID(inode, minOffset, time.Now())
-
+
batchInfo := &BatchInfo{
- BatchID: batchID,
- StartOffset: minOffset,
- EndOffset: maxOffset,
- Size: batchSize,
- ItemCount: itemCount,
- ItemSize: totalSize / int64(itemCount),
- AccessOrder: accessOrder,
- AccessTimes: accessTimes,
- TotalTime: timeSpan,
- LoadTime: timeSpan, // Initially assume all time is load time
- }
-
+ BatchID: batchID,
+ StartOffset: minOffset,
+ EndOffset: maxOffset,
+ Size: batchSize,
+ ItemCount: itemCount,
+ ItemSize: totalSize / int64(itemCount),
+ AccessOrder: accessOrder,
+ AccessTimes: accessTimes,
+ TotalTime: timeSpan,
+ LoadTime: timeSpan, // Initially assume all time is load time
+ }
+
return batchInfo
}
@@ -347,7 +351,7 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc
if len(batch.AccessOrder) < 2 {
return BatchPatternUnknown
}
-
+
// Check for linear pattern (sequential offsets)
isLinear := true
for i := 1; i < len(batch.AccessOrder); i++ {
@@ -356,31 +360,31 @@ func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []Batc
break
}
}
-
+
if isLinear {
return BatchPatternLinear
}
-
+
// Check for strided pattern (regular gaps)
if bo.isStridedPattern(batch.AccessOrder) {
return BatchPatternStrided
}
-
+
// Check for shuffled pattern (randomized order)
if bo.isShuffledPattern(batch.AccessOrder) {
return BatchPatternShuffled
}
-
+
// Check for multi-GPU pattern (parallel access indicators)
if bo.isMultiGPUPattern(accesses) {
return BatchPatternMultiGPU
}
-
+
// Check for pipelined pattern (overlapping accesses)
if bo.isPipelinedPattern(batch.AccessTimes) {
return BatchPatternPipelined
}
-
+
return BatchPatternUnknown
}
@@ -389,13 +393,13 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool {
if len(offsets) < 3 {
return false
}
-
+
// Calculate stride
stride := offsets[1] - offsets[0]
if stride <= 0 {
return false
}
-
+
// Check if all accesses follow the same stride
consistentStrides := 0
for i := 2; i < len(offsets); i++ {
@@ -404,9 +408,9 @@ func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool {
consistentStrides++
}
}
-
+
// At least 80% of strides should be consistent
- return float64(consistentStrides) / float64(len(offsets)-2) >= 0.8
+ return float64(consistentStrides)/float64(len(offsets)-2) >= 0.8
}
// isShuffledPattern checks if accesses are in randomized order
@@ -414,7 +418,7 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool {
if len(offsets) < 5 {
return false
}
-
+
// Count inversions (out-of-order pairs)
inversions := 0
for i := 0; i < len(offsets); i++ {
@@ -424,10 +428,10 @@ func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool {
}
}
}
-
+
totalPairs := len(offsets) * (len(offsets) - 1) / 2
inversionRate := float64(inversions) / float64(totalPairs)
-
+
// High inversion rate suggests shuffling
return inversionRate > 0.3
}
@@ -437,22 +441,22 @@ func (bo *BatchOptimizer) isMultiGPUPattern(accesses []BatchAccess) bool {
// Look for multiple concurrent access streams
// This is a simplified heuristic - in practice, this would need more
// sophisticated detection based on process info, etc.
-
+
if len(accesses) < 4 {
return false
}
-
+
// Check for concurrent accesses (multiple accesses in very short time)
concurrentWindows := 0
windowSize := 100 * time.Millisecond
-
+
for i := 0; i < len(accesses)-1; i++ {
timeDiff := accesses[i+1].AccessTime.Sub(accesses[i].AccessTime)
if timeDiff < windowSize {
concurrentWindows++
}
}
-
+
// If many accesses are concurrent, might be multi-GPU
return float64(concurrentWindows)/float64(len(accesses)) > 0.5
}
@@ -462,30 +466,30 @@ func (bo *BatchOptimizer) isPipelinedPattern(accessTimes []time.Time) bool {
if len(accessTimes) < 3 {
return false
}
-
+
// Look for regular, overlapping timing patterns
intervals := make([]time.Duration, len(accessTimes)-1)
for i := 1; i < len(accessTimes); i++ {
intervals[i-1] = accessTimes[i].Sub(accessTimes[i-1])
}
-
+
// Calculate coefficient of variation for intervals
var sum, sumSq time.Duration
for _, interval := range intervals {
sum += interval
sumSq += interval * interval
}
-
+
n := time.Duration(len(intervals))
mean := sum / n
if mean == 0 {
return false
}
-
+
// Calculate variance and CV
variance := (sumSq / n) - (mean * mean)
- cv := float64(variance) / float64(mean * mean)
-
+ cv := float64(variance) / float64(mean*mean)
+
// Low coefficient of variation suggests regular pipelining
return cv < 0.2
}
@@ -495,18 +499,18 @@ func (bo *BatchOptimizer) calculateBatchMetrics(batch *BatchInfo, accesses []Bat
if len(batch.AccessTimes) < 2 {
return
}
-
+
// Calculate throughput
timeSpan := batch.AccessTimes[len(batch.AccessTimes)-1].Sub(batch.AccessTimes[0])
if timeSpan > 0 {
batch.Throughput = float64(batch.ItemCount) / timeSpan.Seconds()
}
-
+
// Estimate processing vs load time (heuristic)
// In practice, this would need more sophisticated measurement
avgItemTime := timeSpan / time.Duration(batch.ItemCount)
- batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time
- batch.LoadTime = avgItemTime / 2 // Assume 50% load time
+ batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time
+ batch.LoadTime = avgItemTime / 2 // Assume 50% load time
}
// updateBatchSequence updates the batch sequence for an inode
@@ -521,26 +525,26 @@ func (bo *BatchOptimizer) updateBatchSequence(inode uint64, newBatch *BatchInfo)
}
bo.batchSequences[inode] = sequence
}
-
+
sequence.Lock()
defer sequence.Unlock()
-
+
// Link batches
if len(sequence.Batches) > 0 {
lastBatch := sequence.Batches[len(sequence.Batches)-1]
lastBatch.NextBatch = newBatch
newBatch.PreviousBatch = lastBatch
}
-
+
sequence.Batches = append(sequence.Batches, newBatch)
sequence.LastAccess = time.Now()
-
+
// Update sequence pattern based on majority of batches
bo.updateSequencePattern(sequence)
-
+
// Make predictions for next batch
bo.updateSequencePredictions(sequence)
-
+
// Keep sequence size manageable
if len(sequence.Batches) > 100 {
sequence.Batches = sequence.Batches[len(sequence.Batches)-50:] // Keep last 50 batches
@@ -552,13 +556,13 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) {
if len(sequence.Batches) < 3 {
return
}
-
+
// Count patterns
patternCounts := make(map[BatchAccessPattern]int)
for _, batch := range sequence.Batches {
patternCounts[batch.AccessPattern]++
}
-
+
// Find most common pattern
maxCount := 0
var dominantPattern BatchAccessPattern
@@ -568,7 +572,7 @@ func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) {
dominantPattern = pattern
}
}
-
+
sequence.Pattern = dominantPattern
}
@@ -577,12 +581,12 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
if len(sequence.Batches) < 2 {
return
}
-
+
recent := sequence.Batches[len(sequence.Batches)-3:] // Last 3 batches
if len(recent) < 2 {
recent = sequence.Batches
}
-
+
// Predict next batch offset based on pattern
switch sequence.Pattern {
case BatchPatternLinear:
@@ -595,7 +599,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
sequence.NextBatchSize = lastBatch.Size
sequence.Confidence = 0.8
}
-
+
case BatchPatternStrided:
// Regular stride
if len(recent) >= 3 {
@@ -604,7 +608,7 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
sequence.NextBatchSize = recent[len(recent)-1].Size
sequence.Confidence = 0.7
}
-
+
default:
// Lower confidence for unpredictable patterns
sequence.Confidence = 0.3
@@ -615,60 +619,60 @@ func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
func (bo *BatchOptimizer) GetBatchRecommendations(inode uint64) *BatchOptimizationRecommendations {
bo.RLock()
defer bo.RUnlock()
-
+
sequence := bo.batchSequences[inode]
if sequence == nil {
return &BatchOptimizationRecommendations{
ShouldOptimize: false,
}
}
-
+
sequence.RLock()
defer sequence.RUnlock()
-
+
prefetchConfig := bo.prefetchStrategies[sequence.Pattern]
cacheConfig := bo.cacheStrategies[sequence.Pattern]
-
+
if prefetchConfig == nil {
prefetchConfig = bo.prefetchStrategies[BatchPatternUnknown]
}
if cacheConfig == nil {
cacheConfig = bo.cacheStrategies[BatchPatternUnknown]
}
-
+
recommendations := &BatchOptimizationRecommendations{
- ShouldOptimize: true,
- Pattern: sequence.Pattern,
- PrefetchSize: prefetchConfig.PrefetchSize,
- PrefetchCount: prefetchConfig.LookaheadCount,
- CachePriority: cacheConfig.Priority,
- CacheRetention: cacheConfig.RetentionTime,
- NextBatchOffset: sequence.NextBatchOffset,
- NextBatchSize: sequence.NextBatchSize,
- Confidence: sequence.Confidence,
- }
-
+ ShouldOptimize: true,
+ Pattern: sequence.Pattern,
+ PrefetchSize: prefetchConfig.PrefetchSize,
+ PrefetchCount: prefetchConfig.LookaheadCount,
+ CachePriority: cacheConfig.Priority,
+ CacheRetention: cacheConfig.RetentionTime,
+ NextBatchOffset: sequence.NextBatchOffset,
+ NextBatchSize: sequence.NextBatchSize,
+ Confidence: sequence.Confidence,
+ }
+
return recommendations
}
// BatchOptimizationRecommendations holds batch optimization recommendations
type BatchOptimizationRecommendations struct {
- ShouldOptimize bool `json:"should_optimize"`
- Pattern BatchAccessPattern `json:"pattern"`
- PrefetchSize int64 `json:"prefetch_size"`
- PrefetchCount int `json:"prefetch_count"`
- CachePriority CachePriority `json:"cache_priority"`
- CacheRetention time.Duration `json:"cache_retention"`
- NextBatchOffset int64 `json:"next_batch_offset"`
- NextBatchSize int64 `json:"next_batch_size"`
- Confidence float64 `json:"confidence"`
+ ShouldOptimize bool `json:"should_optimize"`
+ Pattern BatchAccessPattern `json:"pattern"`
+ PrefetchSize int64 `json:"prefetch_size"`
+ PrefetchCount int `json:"prefetch_count"`
+ CachePriority CachePriority `json:"cache_priority"`
+ CacheRetention time.Duration `json:"cache_retention"`
+ NextBatchOffset int64 `json:"next_batch_offset"`
+ NextBatchSize int64 `json:"next_batch_size"`
+ Confidence float64 `json:"confidence"`
}
// GetBatchMetrics returns comprehensive batch optimization metrics
func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics {
bo.RLock()
defer bo.RUnlock()
-
+
metrics := BatchOptimizerMetrics{
TotalBatchesDetected: bo.totalBatchesDetected,
ActiveBatches: int64(len(bo.activeBatches)),
@@ -677,32 +681,32 @@ func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics {
OptimizationMisses: bo.optimizationMisses,
PatternCounts: make(map[BatchAccessPattern]int64),
}
-
+
// Count patterns
for _, batch := range bo.activeBatches {
batch.RLock()
metrics.PatternCounts[batch.AccessPattern]++
batch.RUnlock()
}
-
+
// Calculate hit rate
totalAttempts := bo.optimizationHits + bo.optimizationMisses
if totalAttempts > 0 {
metrics.OptimizationHitRate = float64(bo.optimizationHits) / float64(totalAttempts)
}
-
+
return metrics
}
// BatchOptimizerMetrics holds metrics for batch optimization
type BatchOptimizerMetrics struct {
- TotalBatchesDetected int64 `json:"total_batches_detected"`
- ActiveBatches int64 `json:"active_batches"`
- CompletedBatches int64 `json:"completed_batches"`
- OptimizationHits int64 `json:"optimization_hits"`
- OptimizationMisses int64 `json:"optimization_misses"`
- OptimizationHitRate float64 `json:"optimization_hit_rate"`
- PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"`
+ TotalBatchesDetected int64 `json:"total_batches_detected"`
+ ActiveBatches int64 `json:"active_batches"`
+ CompletedBatches int64 `json:"completed_batches"`
+ OptimizationHits int64 `json:"optimization_hits"`
+ OptimizationMisses int64 `json:"optimization_misses"`
+ OptimizationHitRate float64 `json:"optimization_hit_rate"`
+ PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"`
}
// cleanupRoutine performs periodic cleanup of old batch information
@@ -721,21 +725,21 @@ func (bo *BatchOptimizer) cleanupRoutine() {
func (bo *BatchOptimizer) performCleanup() {
bo.Lock()
defer bo.Unlock()
-
+
now := time.Now()
cutoff := now.Add(-30 * time.Minute) // Remove batches older than 30 minutes
-
+
// Clean up completed batches
for id, batch := range bo.completedBatches {
batch.RLock()
shouldRemove := len(batch.AccessTimes) > 0 && batch.AccessTimes[0].Before(cutoff)
batch.RUnlock()
-
+
if shouldRemove {
delete(bo.completedBatches, id)
}
}
-
+
// Clean up access history
for inode, history := range bo.accessHistory {
filtered := make([]BatchAccess, 0, len(history))
@@ -744,14 +748,14 @@ func (bo *BatchOptimizer) performCleanup() {
filtered = append(filtered, access)
}
}
-
+
if len(filtered) == 0 {
delete(bo.accessHistory, inode)
} else {
bo.accessHistory[inode] = filtered
}
}
-
+
// Clean up batch sequences
for inode, sequence := range bo.batchSequences {
sequence.Lock()
@@ -762,7 +766,7 @@ func (bo *BatchOptimizer) performCleanup() {
}
sequence.Unlock()
}
-
+
glog.V(4).Infof("Batch optimizer cleanup completed")
}
@@ -771,9 +775,9 @@ func (bo *BatchOptimizer) Shutdown() {
if bo.cleanupTicker != nil {
bo.cleanupTicker.Stop()
}
-
+
close(bo.stopCleanup)
-
+
glog.V(1).Infof("Batch optimizer shutdown complete")
}