aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/ml/batch_optimizer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/ml/batch_optimizer.go')
-rw-r--r--weed/mount/ml/batch_optimizer.go809
1 files changed, 809 insertions, 0 deletions
diff --git a/weed/mount/ml/batch_optimizer.go b/weed/mount/ml/batch_optimizer.go
new file mode 100644
index 000000000..d5dbfa636
--- /dev/null
+++ b/weed/mount/ml/batch_optimizer.go
@@ -0,0 +1,809 @@
+package ml
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// BatchAccessPattern represents different batch access patterns
+type BatchAccessPattern int
+
+const (
+ BatchPatternUnknown BatchAccessPattern = iota
+ BatchPatternLinear // Linear batch processing
+ BatchPatternStrided // Strided access with fixed gaps
+ BatchPatternShuffled // Randomized batch order
+ BatchPatternHierarchical // Hierarchical/nested batch access
+ BatchPatternMultiGPU // Multi-GPU distributed batches
+ BatchPatternPipelined // Pipelined batch processing
+)
+
+// BatchAccess represents a single file access that's part of batch processing
+type BatchAccess struct {
+ Offset int64 // File offset
+ Size int // Access size
+ AccessTime time.Time // When accessed
+ IsRead bool // Whether this was a read operation
+ BatchHint string // Optional batch identifier hint
+}
+
+// BatchInfo holds information about a detected batch
+type BatchInfo struct {
+ sync.RWMutex
+
+ // Batch identification
+ BatchID string // Unique batch identifier
+ StartOffset int64 // Starting file offset
+ EndOffset int64 // Ending file offset
+ Size int64 // Total batch size in bytes
+ ItemCount int // Number of items in batch
+ ItemSize int64 // Average item size
+
+ // Access pattern
+ AccessPattern BatchAccessPattern // Detected access pattern
+ AccessOrder []int64 // Order of access within batch
+ AccessTimes []time.Time // When each item was accessed
+ ProcessingTime time.Duration // Total time to process batch
+
+ // Performance metrics
+ LoadTime time.Duration // Time to load batch from storage
+ ProcessTime time.Duration // Time to process batch (compute)
+ TotalTime time.Duration // Total end-to-end time
+ Throughput float64 // Items per second
+
+ // Optimization state
+ IsPrefetched bool // Whether batch was prefetched
+ CacheHitRate float64 // Percentage of cache hits
+ OptimalPrefetch int64 // Recommended prefetch size
+
+ // Relationship to other batches
+ PreviousBatch *BatchInfo // Previous batch in sequence
+ NextBatch *BatchInfo // Next batch in sequence
+ ParentBatch *BatchInfo // Parent batch (for hierarchical)
+ ChildBatches []*BatchInfo // Child batches (for hierarchical)
+}
+
+// BatchOptimizer optimizes batch access patterns for ML workloads
+type BatchOptimizer struct {
+ sync.RWMutex
+
+ // Configuration
+ maxBatchesTracked int // Maximum number of batches to track
+ batchDetectionWindow int // Window size for batch detection
+ minBatchSize int64 // Minimum size to consider as batch
+ maxBatchSize int64 // Maximum size to consider as batch
+
+ // Batch tracking
+ activeBatches map[string]*BatchInfo // Currently active batches
+ completedBatches map[string]*BatchInfo // Recently completed batches
+ inodeToBatches map[uint64][]*BatchInfo // File to batches mapping
+
+ // Pattern detection
+ accessHistory map[uint64][]BatchAccess // Recent access history per file
+ batchSequences map[uint64]*BatchSequence // Detected batch sequences
+
+ // Optimization strategies
+ prefetchStrategies map[BatchAccessPattern]*PrefetchConfig // Prefetch configs per pattern
+ cacheStrategies map[BatchAccessPattern]*CacheConfig // Cache configs per pattern
+
+ // Statistics
+ totalBatchesDetected int64 // Total batches detected
+ optimizationHits int64 // Successful optimization applications
+ optimizationMisses int64 // Failed optimization attempts
+
+ // Background processing
+ cleanupTicker *time.Ticker // Cleanup timer
+ stopCleanup chan struct{} // Cleanup stop signal
+}
+
+// BatchSequence represents a sequence of related batches
+type BatchSequence struct {
+ sync.RWMutex
+
+ SequenceID string // Unique sequence identifier
+ Batches []*BatchInfo // Batches in sequence
+ Pattern BatchAccessPattern // Overall sequence pattern
+ StartTime time.Time // When sequence started
+ LastAccess time.Time // Last access in sequence
+ IsComplete bool // Whether sequence is complete
+ RepeatCount int // How many times sequence has repeated
+
+ // Predictions
+ NextBatchOffset int64 // Predicted next batch offset
+ NextBatchSize int64 // Predicted next batch size
+ Confidence float64 // Confidence in predictions (0-1)
+}
+
+// PrefetchConfig holds configuration for prefetching strategies
+type PrefetchConfig struct {
+ Strategy PrefetchStrategy // Which prefetch strategy to use
+ LookaheadCount int // How many items to prefetch ahead
+ PrefetchSize int64 // Size to prefetch per operation
+ ConcurrencyLevel int // How many concurrent prefetch operations
+ AdaptiveScaling bool // Whether to scale based on performance
+}
+
+// CacheConfig holds configuration for caching strategies
+type CacheConfig struct {
+ Policy CachePolicy // Which cache policy to use
+ RetentionTime time.Duration // How long to keep items cached
+ Priority CachePriority // Cache priority level
+ PreloadBatches int // How many batches to preload
+}
+
+// NewBatchOptimizer creates a new batch optimizer
+func NewBatchOptimizer() *BatchOptimizer {
+ bo := &BatchOptimizer{
+ maxBatchesTracked: 1000, // Track up to 1000 batches
+ batchDetectionWindow: 100, // Look at last 100 accesses
+ minBatchSize: 64 * 1024, // Minimum 64KB batch
+ maxBatchSize: 100 * 1024 * 1024, // Maximum 100MB batch
+
+ activeBatches: make(map[string]*BatchInfo),
+ completedBatches: make(map[string]*BatchInfo),
+ inodeToBatches: make(map[uint64][]*BatchInfo),
+ accessHistory: make(map[uint64][]BatchAccess),
+ batchSequences: make(map[uint64]*BatchSequence),
+
+ prefetchStrategies: make(map[BatchAccessPattern]*PrefetchConfig),
+ cacheStrategies: make(map[BatchAccessPattern]*CacheConfig),
+
+ stopCleanup: make(chan struct{}),
+ }
+
+ // Initialize default strategies
+ bo.initializeDefaultStrategies()
+
+ // Start cleanup routine
+ bo.cleanupTicker = time.NewTicker(5 * time.Minute)
+ go bo.cleanupRoutine()
+
+ glog.V(1).Infof("Batch optimizer initialized")
+ return bo
+}
+
+// initializeDefaultStrategies sets up default optimization strategies for each pattern
+func (bo *BatchOptimizer) initializeDefaultStrategies() {
+ // Linear batch pattern - aggressive prefetching
+ bo.prefetchStrategies[BatchPatternLinear] = &PrefetchConfig{
+ Strategy: PrefetchAggressive,
+ LookaheadCount: 5,
+ PrefetchSize: 2 * 1024 * 1024, // 2MB
+ ConcurrencyLevel: 3,
+ AdaptiveScaling: true,
+ }
+ bo.cacheStrategies[BatchPatternLinear] = &CacheConfig{
+ Policy: CachePolicyTrainingAware,
+ RetentionTime: 10 * time.Minute,
+ Priority: CachePriorityHigh,
+ PreloadBatches: 2,
+ }
+
+ // Shuffled batch pattern - conservative prefetching
+ bo.prefetchStrategies[BatchPatternShuffled] = &PrefetchConfig{
+ Strategy: PrefetchBalanced,
+ LookaheadCount: 2,
+ PrefetchSize: 512 * 1024, // 512KB
+ ConcurrencyLevel: 2,
+ AdaptiveScaling: true,
+ }
+ bo.cacheStrategies[BatchPatternShuffled] = &CacheConfig{
+ Policy: CachePolicyLRU,
+ RetentionTime: 5 * time.Minute,
+ Priority: CachePriorityNormal,
+ PreloadBatches: 1,
+ }
+
+ // Multi-GPU pattern - high concurrency
+ bo.prefetchStrategies[BatchPatternMultiGPU] = &PrefetchConfig{
+ Strategy: PrefetchAggressive,
+ LookaheadCount: 8,
+ PrefetchSize: 4 * 1024 * 1024, // 4MB
+ ConcurrencyLevel: 6,
+ AdaptiveScaling: true,
+ }
+ bo.cacheStrategies[BatchPatternMultiGPU] = &CacheConfig{
+ Policy: CachePolicyML,
+ RetentionTime: 15 * time.Minute,
+ Priority: CachePriorityUrgent,
+ PreloadBatches: 4,
+ }
+}
+
+// RecordBatchAccess records a file access that's part of batch processing
+func (bo *BatchOptimizer) RecordBatchAccess(inode uint64, offset int64, size int, isRead bool, batchHint string) *BatchInfo {
+ bo.Lock()
+ defer bo.Unlock()
+
+ access := BatchAccess{
+ Offset: offset,
+ Size: size,
+ AccessTime: time.Now(),
+ IsRead: isRead,
+ BatchHint: batchHint,
+ }
+
+ // Add to access history
+ history := bo.accessHistory[inode]
+ history = append(history, access)
+ if len(history) > bo.batchDetectionWindow {
+ history = history[1:] // Keep only recent accesses
+ }
+ bo.accessHistory[inode] = history
+
+ // Detect batch patterns
+ batchInfo := bo.detectBatchPattern(inode, history)
+ if batchInfo != nil {
+ bo.totalBatchesDetected++
+
+ // Add to tracking
+ bo.activeBatches[batchInfo.BatchID] = batchInfo
+ bo.inodeToBatches[inode] = append(bo.inodeToBatches[inode], batchInfo)
+
+ // Update batch sequence
+ bo.updateBatchSequence(inode, batchInfo)
+
+ glog.V(3).Infof("Detected batch: inode=%d, pattern=%v, size=%d, items=%d",
+ inode, batchInfo.AccessPattern, batchInfo.Size, batchInfo.ItemCount)
+ }
+
+ return batchInfo
+}
+
+// detectBatchPattern analyzes access history to detect batch patterns
+func (bo *BatchOptimizer) detectBatchPattern(inode uint64, history []BatchAccess) *BatchInfo {
+ if len(history) < 3 {
+ return nil // Need minimum history
+ }
+
+ // Look for batch boundaries by analyzing access gaps and patterns
+ recent := history[len(history)-10:] // Look at last 10 accesses
+ if len(recent) < 3 {
+ recent = history
+ }
+
+ // Check for batch characteristics
+ batchInfo := bo.analyzePotentialBatch(recent, inode)
+ if batchInfo == nil {
+ return nil
+ }
+
+ // Determine access pattern
+ batchInfo.AccessPattern = bo.classifyBatchPattern(batchInfo, recent)
+
+ // Calculate performance metrics
+ bo.calculateBatchMetrics(batchInfo, recent)
+
+ return batchInfo
+}
+
+// analyzePotentialBatch analyzes a sequence of accesses to see if they form a batch
+func (bo *BatchOptimizer) analyzePotentialBatch(accesses []BatchAccess, inode uint64) *BatchInfo {
+ if len(accesses) < 2 {
+ return nil
+ }
+
+ // Calculate basic statistics
+ var totalSize int64
+ var itemCount int
+ minOffset := accesses[0].Offset
+ maxOffset := accesses[0].Offset
+
+ accessOrder := make([]int64, len(accesses))
+ accessTimes := make([]time.Time, len(accesses))
+
+ for i, access := range accesses {
+ totalSize += int64(access.Size)
+ itemCount++
+
+ if access.Offset < minOffset {
+ minOffset = access.Offset
+ }
+ if access.Offset > maxOffset {
+ maxOffset = access.Offset
+ }
+
+ accessOrder[i] = access.Offset
+ accessTimes[i] = access.AccessTime
+ }
+
+ batchSize := maxOffset - minOffset + int64(accesses[len(accesses)-1].Size)
+
+ // Check if this qualifies as a batch
+ if batchSize < bo.minBatchSize || batchSize > bo.maxBatchSize {
+ return nil
+ }
+
+ // Check temporal locality (accesses should be close in time)
+ timeSpan := accessTimes[len(accessTimes)-1].Sub(accessTimes[0])
+ if timeSpan > 10*time.Minute { // Too spread out in time
+ return nil
+ }
+
+ // Create batch info
+ batchID := generateBatchID(inode, minOffset, time.Now())
+
+ batchInfo := &BatchInfo{
+ BatchID: batchID,
+ StartOffset: minOffset,
+ EndOffset: maxOffset,
+ Size: batchSize,
+ ItemCount: itemCount,
+ ItemSize: totalSize / int64(itemCount),
+ AccessOrder: accessOrder,
+ AccessTimes: accessTimes,
+ TotalTime: timeSpan,
+ LoadTime: timeSpan, // Initially assume all time is load time
+ }
+
+ return batchInfo
+}
+
+// classifyBatchPattern determines the access pattern of a batch
+func (bo *BatchOptimizer) classifyBatchPattern(batch *BatchInfo, accesses []BatchAccess) BatchAccessPattern {
+ if len(batch.AccessOrder) < 2 {
+ return BatchPatternUnknown
+ }
+
+ // Check for linear pattern (sequential offsets)
+ isLinear := true
+ for i := 1; i < len(batch.AccessOrder); i++ {
+ if batch.AccessOrder[i] <= batch.AccessOrder[i-1] {
+ isLinear = false
+ break
+ }
+ }
+
+ if isLinear {
+ return BatchPatternLinear
+ }
+
+ // Check for strided pattern (regular gaps)
+ if bo.isStridedPattern(batch.AccessOrder) {
+ return BatchPatternStrided
+ }
+
+ // Check for shuffled pattern (randomized order)
+ if bo.isShuffledPattern(batch.AccessOrder) {
+ return BatchPatternShuffled
+ }
+
+ // Check for multi-GPU pattern (parallel access indicators)
+ if bo.isMultiGPUPattern(accesses) {
+ return BatchPatternMultiGPU
+ }
+
+ // Check for pipelined pattern (overlapping accesses)
+ if bo.isPipelinedPattern(batch.AccessTimes) {
+ return BatchPatternPipelined
+ }
+
+ return BatchPatternUnknown
+}
+
+// isStridedPattern checks if accesses follow a strided pattern
+func (bo *BatchOptimizer) isStridedPattern(offsets []int64) bool {
+ if len(offsets) < 3 {
+ return false
+ }
+
+ // Calculate stride
+ stride := offsets[1] - offsets[0]
+ if stride <= 0 {
+ return false
+ }
+
+ // Check if all accesses follow the same stride
+ consistentStrides := 0
+ for i := 2; i < len(offsets); i++ {
+ currentStride := offsets[i] - offsets[i-1]
+ if currentStride == stride {
+ consistentStrides++
+ }
+ }
+
+ // At least 80% of strides should be consistent
+ return float64(consistentStrides) / float64(len(offsets)-2) >= 0.8
+}
+
+// isShuffledPattern checks if accesses are in randomized order
+func (bo *BatchOptimizer) isShuffledPattern(offsets []int64) bool {
+ if len(offsets) < 5 {
+ return false
+ }
+
+ // Count inversions (out-of-order pairs)
+ inversions := 0
+ for i := 0; i < len(offsets); i++ {
+ for j := i + 1; j < len(offsets); j++ {
+ if offsets[i] > offsets[j] {
+ inversions++
+ }
+ }
+ }
+
+ totalPairs := len(offsets) * (len(offsets) - 1) / 2
+ inversionRate := float64(inversions) / float64(totalPairs)
+
+ // High inversion rate suggests shuffling
+ return inversionRate > 0.3
+}
+
+// isMultiGPUPattern checks for multi-GPU access patterns
+func (bo *BatchOptimizer) isMultiGPUPattern(accesses []BatchAccess) bool {
+ // Look for multiple concurrent access streams
+ // This is a simplified heuristic - in practice, this would need more
+ // sophisticated detection based on process info, etc.
+
+ if len(accesses) < 4 {
+ return false
+ }
+
+ // Check for concurrent accesses (multiple accesses in very short time)
+ concurrentWindows := 0
+ windowSize := 100 * time.Millisecond
+
+ for i := 0; i < len(accesses)-1; i++ {
+ timeDiff := accesses[i+1].AccessTime.Sub(accesses[i].AccessTime)
+ if timeDiff < windowSize {
+ concurrentWindows++
+ }
+ }
+
+ // If many accesses are concurrent, might be multi-GPU
+ return float64(concurrentWindows)/float64(len(accesses)) > 0.5
+}
+
+// isPipelinedPattern checks for pipelined access patterns
+func (bo *BatchOptimizer) isPipelinedPattern(accessTimes []time.Time) bool {
+ if len(accessTimes) < 3 {
+ return false
+ }
+
+ // Look for regular, overlapping timing patterns
+ intervals := make([]time.Duration, len(accessTimes)-1)
+ for i := 1; i < len(accessTimes); i++ {
+ intervals[i-1] = accessTimes[i].Sub(accessTimes[i-1])
+ }
+
+ // Calculate coefficient of variation for intervals
+ var sum, sumSq time.Duration
+ for _, interval := range intervals {
+ sum += interval
+ sumSq += interval * interval
+ }
+
+ n := time.Duration(len(intervals))
+ mean := sum / n
+ if mean == 0 {
+ return false
+ }
+
+ // Calculate variance and CV
+ variance := (sumSq / n) - (mean * mean)
+ cv := float64(variance) / float64(mean * mean)
+
+ // Low coefficient of variation suggests regular pipelining
+ return cv < 0.2
+}
+
+// calculateBatchMetrics calculates performance metrics for a batch
+func (bo *BatchOptimizer) calculateBatchMetrics(batch *BatchInfo, accesses []BatchAccess) {
+ if len(batch.AccessTimes) < 2 {
+ return
+ }
+
+ // Calculate throughput
+ timeSpan := batch.AccessTimes[len(batch.AccessTimes)-1].Sub(batch.AccessTimes[0])
+ if timeSpan > 0 {
+ batch.Throughput = float64(batch.ItemCount) / timeSpan.Seconds()
+ }
+
+ // Estimate processing vs load time (heuristic)
+ // In practice, this would need more sophisticated measurement
+ avgItemTime := timeSpan / time.Duration(batch.ItemCount)
+ batch.ProcessTime = avgItemTime / 2 // Assume 50% processing time
+ batch.LoadTime = avgItemTime / 2 // Assume 50% load time
+}
+
+// updateBatchSequence updates the batch sequence for an inode
+func (bo *BatchOptimizer) updateBatchSequence(inode uint64, newBatch *BatchInfo) {
+ sequence := bo.batchSequences[inode]
+ if sequence == nil {
+ sequence = &BatchSequence{
+ SequenceID: generateSequenceID(inode, time.Now()),
+ Batches: make([]*BatchInfo, 0, 10),
+ StartTime: time.Now(),
+ Pattern: newBatch.AccessPattern,
+ }
+ bo.batchSequences[inode] = sequence
+ }
+
+ sequence.Lock()
+ defer sequence.Unlock()
+
+ // Link batches
+ if len(sequence.Batches) > 0 {
+ lastBatch := sequence.Batches[len(sequence.Batches)-1]
+ lastBatch.NextBatch = newBatch
+ newBatch.PreviousBatch = lastBatch
+ }
+
+ sequence.Batches = append(sequence.Batches, newBatch)
+ sequence.LastAccess = time.Now()
+
+ // Update sequence pattern based on majority of batches
+ bo.updateSequencePattern(sequence)
+
+ // Make predictions for next batch
+ bo.updateSequencePredictions(sequence)
+
+ // Keep sequence size manageable
+ if len(sequence.Batches) > 100 {
+ sequence.Batches = sequence.Batches[len(sequence.Batches)-50:] // Keep last 50 batches
+ }
+}
+
+// updateSequencePattern updates the overall pattern of a batch sequence
+func (bo *BatchOptimizer) updateSequencePattern(sequence *BatchSequence) {
+ if len(sequence.Batches) < 3 {
+ return
+ }
+
+ // Count patterns
+ patternCounts := make(map[BatchAccessPattern]int)
+ for _, batch := range sequence.Batches {
+ patternCounts[batch.AccessPattern]++
+ }
+
+ // Find most common pattern
+ maxCount := 0
+ var dominantPattern BatchAccessPattern
+ for pattern, count := range patternCounts {
+ if count > maxCount {
+ maxCount = count
+ dominantPattern = pattern
+ }
+ }
+
+ sequence.Pattern = dominantPattern
+}
+
+// updateSequencePredictions updates predictions for the next batch
+func (bo *BatchOptimizer) updateSequencePredictions(sequence *BatchSequence) {
+ if len(sequence.Batches) < 2 {
+ return
+ }
+
+ recent := sequence.Batches[len(sequence.Batches)-3:] // Last 3 batches
+ if len(recent) < 2 {
+ recent = sequence.Batches
+ }
+
+ // Predict next batch offset based on pattern
+ switch sequence.Pattern {
+ case BatchPatternLinear:
+ // Linear progression
+ lastBatch := recent[len(recent)-1]
+ if len(recent) >= 2 {
+ prevBatch := recent[len(recent)-2]
+ gap := lastBatch.StartOffset - prevBatch.EndOffset
+ sequence.NextBatchOffset = lastBatch.EndOffset + gap
+ sequence.NextBatchSize = lastBatch.Size
+ sequence.Confidence = 0.8
+ }
+
+ case BatchPatternStrided:
+ // Regular stride
+ if len(recent) >= 3 {
+ stride := recent[len(recent)-1].StartOffset - recent[len(recent)-2].StartOffset
+ sequence.NextBatchOffset = recent[len(recent)-1].StartOffset + stride
+ sequence.NextBatchSize = recent[len(recent)-1].Size
+ sequence.Confidence = 0.7
+ }
+
+ default:
+ // Lower confidence for unpredictable patterns
+ sequence.Confidence = 0.3
+ }
+}
+
+// GetBatchRecommendations returns optimization recommendations for batch access
+func (bo *BatchOptimizer) GetBatchRecommendations(inode uint64) *BatchOptimizationRecommendations {
+ bo.RLock()
+ defer bo.RUnlock()
+
+ sequence := bo.batchSequences[inode]
+ if sequence == nil {
+ return &BatchOptimizationRecommendations{
+ ShouldOptimize: false,
+ }
+ }
+
+ sequence.RLock()
+ defer sequence.RUnlock()
+
+ prefetchConfig := bo.prefetchStrategies[sequence.Pattern]
+ cacheConfig := bo.cacheStrategies[sequence.Pattern]
+
+ if prefetchConfig == nil {
+ prefetchConfig = bo.prefetchStrategies[BatchPatternUnknown]
+ }
+ if cacheConfig == nil {
+ cacheConfig = bo.cacheStrategies[BatchPatternUnknown]
+ }
+
+ recommendations := &BatchOptimizationRecommendations{
+ ShouldOptimize: true,
+ Pattern: sequence.Pattern,
+ PrefetchSize: prefetchConfig.PrefetchSize,
+ PrefetchCount: prefetchConfig.LookaheadCount,
+ CachePriority: cacheConfig.Priority,
+ CacheRetention: cacheConfig.RetentionTime,
+ NextBatchOffset: sequence.NextBatchOffset,
+ NextBatchSize: sequence.NextBatchSize,
+ Confidence: sequence.Confidence,
+ }
+
+ return recommendations
+}
+
+// BatchOptimizationRecommendations holds batch optimization recommendations
+type BatchOptimizationRecommendations struct {
+ ShouldOptimize bool `json:"should_optimize"`
+ Pattern BatchAccessPattern `json:"pattern"`
+ PrefetchSize int64 `json:"prefetch_size"`
+ PrefetchCount int `json:"prefetch_count"`
+ CachePriority CachePriority `json:"cache_priority"`
+ CacheRetention time.Duration `json:"cache_retention"`
+ NextBatchOffset int64 `json:"next_batch_offset"`
+ NextBatchSize int64 `json:"next_batch_size"`
+ Confidence float64 `json:"confidence"`
+}
+
+// GetBatchMetrics returns comprehensive batch optimization metrics
+func (bo *BatchOptimizer) GetBatchMetrics() BatchOptimizerMetrics {
+ bo.RLock()
+ defer bo.RUnlock()
+
+ metrics := BatchOptimizerMetrics{
+ TotalBatchesDetected: bo.totalBatchesDetected,
+ ActiveBatches: int64(len(bo.activeBatches)),
+ CompletedBatches: int64(len(bo.completedBatches)),
+ OptimizationHits: bo.optimizationHits,
+ OptimizationMisses: bo.optimizationMisses,
+ PatternCounts: make(map[BatchAccessPattern]int64),
+ }
+
+ // Count patterns
+ for _, batch := range bo.activeBatches {
+ batch.RLock()
+ metrics.PatternCounts[batch.AccessPattern]++
+ batch.RUnlock()
+ }
+
+ // Calculate hit rate
+ totalAttempts := bo.optimizationHits + bo.optimizationMisses
+ if totalAttempts > 0 {
+ metrics.OptimizationHitRate = float64(bo.optimizationHits) / float64(totalAttempts)
+ }
+
+ return metrics
+}
+
+// BatchOptimizerMetrics holds metrics for batch optimization
+type BatchOptimizerMetrics struct {
+ TotalBatchesDetected int64 `json:"total_batches_detected"`
+ ActiveBatches int64 `json:"active_batches"`
+ CompletedBatches int64 `json:"completed_batches"`
+ OptimizationHits int64 `json:"optimization_hits"`
+ OptimizationMisses int64 `json:"optimization_misses"`
+ OptimizationHitRate float64 `json:"optimization_hit_rate"`
+ PatternCounts map[BatchAccessPattern]int64 `json:"pattern_counts"`
+}
+
+// cleanupRoutine performs periodic cleanup of old batch information
+func (bo *BatchOptimizer) cleanupRoutine() {
+ for {
+ select {
+ case <-bo.cleanupTicker.C:
+ bo.performCleanup()
+ case <-bo.stopCleanup:
+ return
+ }
+ }
+}
+
+// performCleanup removes old batch information
+func (bo *BatchOptimizer) performCleanup() {
+ bo.Lock()
+ defer bo.Unlock()
+
+ now := time.Now()
+ cutoff := now.Add(-30 * time.Minute) // Remove batches older than 30 minutes
+
+ // Clean up completed batches
+ for id, batch := range bo.completedBatches {
+ batch.RLock()
+ shouldRemove := len(batch.AccessTimes) > 0 && batch.AccessTimes[0].Before(cutoff)
+ batch.RUnlock()
+
+ if shouldRemove {
+ delete(bo.completedBatches, id)
+ }
+ }
+
+ // Clean up access history
+ for inode, history := range bo.accessHistory {
+ filtered := make([]BatchAccess, 0, len(history))
+ for _, access := range history {
+ if access.AccessTime.After(cutoff) {
+ filtered = append(filtered, access)
+ }
+ }
+
+ if len(filtered) == 0 {
+ delete(bo.accessHistory, inode)
+ } else {
+ bo.accessHistory[inode] = filtered
+ }
+ }
+
+ // Clean up batch sequences
+ for inode, sequence := range bo.batchSequences {
+ sequence.Lock()
+ if sequence.LastAccess.Before(cutoff) {
+ delete(bo.batchSequences, inode)
+ sequence.Unlock()
+ continue
+ }
+ sequence.Unlock()
+ }
+
+ glog.V(4).Infof("Batch optimizer cleanup completed")
+}
+
+// Shutdown gracefully shuts down the batch optimizer
+func (bo *BatchOptimizer) Shutdown() {
+ if bo.cleanupTicker != nil {
+ bo.cleanupTicker.Stop()
+ }
+
+ close(bo.stopCleanup)
+
+ glog.V(1).Infof("Batch optimizer shutdown complete")
+}
+
+// Helper functions
+
+func generateBatchID(inode uint64, offset int64, timestamp time.Time) string {
+ return fmt.Sprintf("batch_%d_%d_%d", inode, offset, timestamp.Unix())
+}
+
+func generateSequenceID(inode uint64, timestamp time.Time) string {
+ return fmt.Sprintf("seq_%d_%d", inode, timestamp.Unix())
+}
+
+// String methods for enums
+
+func (bap BatchAccessPattern) String() string {
+ switch bap {
+ case BatchPatternLinear:
+ return "Linear"
+ case BatchPatternStrided:
+ return "Strided"
+ case BatchPatternShuffled:
+ return "Shuffled"
+ case BatchPatternHierarchical:
+ return "Hierarchical"
+ case BatchPatternMultiGPU:
+ return "MultiGPU"
+ case BatchPatternPipelined:
+ return "Pipelined"
+ default:
+ return "Unknown"
+ }
+}