diff options
Diffstat (limited to 'weed/mount/ml/open_file_cache.go')
| -rw-r--r-- | weed/mount/ml/open_file_cache.go | 577 |
1 files changed, 577 insertions, 0 deletions
diff --git a/weed/mount/ml/open_file_cache.go b/weed/mount/ml/open_file_cache.go new file mode 100644 index 000000000..8d23a0bd8 --- /dev/null +++ b/weed/mount/ml/open_file_cache.go @@ -0,0 +1,577 @@ +package ml + +import ( + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// ChunkMetadata contains metadata about a cached chunk +type ChunkMetadata struct { + FileId string // Chunk file ID + Offset uint64 // Offset within the file + Size uint64 // Size of the chunk + CacheLevel int // 0=memory, 1=disk, 2=not cached + LastAccess time.Time // Last access time + AccessCount int64 // Number of times accessed + IsHot bool // Whether this chunk is frequently accessed + Pattern AccessPattern // Access pattern for this chunk +} + +// OpenFileInfo contains comprehensive information about an open file +type OpenFileInfo struct { + sync.RWMutex + + // Basic file information + Inode uint64 // File inode + Entry *filer_pb.Entry // File entry from filer + OpenCount int // Number of open handles + OpenTime time.Time // When file was first opened + LastAccess time.Time // Last access time + + // Chunk-level caching + ChunkCache map[uint32]*ChunkMetadata // chunk index -> metadata + ChunkCount uint32 // Total number of chunks in file + ChunkSize int64 // Size of each chunk + + // Access pattern tracking + AccessInfo *AccessInfo // Access pattern information + ReadPattern AccessPattern // Overall file access pattern + PrefetchState PrefetchState // Current prefetch state + + // ML-specific optimizations + IsMLFile bool // Whether this is likely an ML-related file + FileType MLFileType // Type of ML file (dataset, model, etc.) + BatchSize int // Detected batch size for training data + EpochCount int // Number of epochs detected + + // Performance tracking + TotalBytesRead int64 // Total bytes read from this file + CacheHitCount int64 // Number of cache hits + CacheMissCount int64 // Number of cache misses + PrefetchHitCount int64 // Number of prefetch hits +} + +// PrefetchState represents the current prefetch state for a file +type PrefetchState int + +const ( + PrefetchIdle PrefetchState = iota + PrefetchActive + PrefetchComplete + PrefetchSuspended +) + +// MLFileType represents the type of ML-related file +type MLFileType int + +const ( + MLFileUnknown MLFileType = iota + MLFileDataset // Training/validation dataset + MLFileModel // Model checkpoint/weights + MLFileConfig // Configuration files + MLFileTensor // Individual tensor files + MLFileLog // Training logs +) + +// OpenFileCache manages open file information with ML-aware optimizations +type OpenFileCache struct { + sync.RWMutex + + // Configuration + maxFiles int // Maximum number of files to track + ttl time.Duration // TTL for inactive files + cleanupInterval time.Duration // Cleanup interval + + // File tracking + files map[uint64]*OpenFileInfo // inode -> file info + accessOrder []uint64 // LRU order for eviction + + // ML-specific configuration + enableMLOptimization bool + mlFileDetector *MLFileDetector + + // Metrics + totalFiles int64 + evictedFiles int64 + cacheHits int64 + cacheMisses int64 + + // Background cleanup + shutdown chan struct{} + done chan struct{} +} + +// MLFileDetector detects ML-related files based on patterns and metadata +type MLFileDetector struct { + // File extension patterns + datasetExtensions map[string]bool + modelExtensions map[string]bool + configExtensions map[string]bool + + // Path patterns + datasetPaths []string + modelPaths []string + + // Size heuristics + modelMinSize int64 // Minimum size for model files + datasetMaxItems int // Maximum items in dataset directory +} + +// NewOpenFileCache creates a new open file cache optimized for ML workloads +func NewOpenFileCache(maxFiles int, ttl time.Duration) *OpenFileCache { + if maxFiles <= 0 { + maxFiles = 1000 // Default suitable for ML workloads + } + if ttl <= 0 { + ttl = 30 * time.Minute // Default TTL + } + + ofc := &OpenFileCache{ + maxFiles: maxFiles, + ttl: ttl, + cleanupInterval: 5 * time.Minute, + files: make(map[uint64]*OpenFileInfo), + accessOrder: make([]uint64, 0, maxFiles), + enableMLOptimization: true, + mlFileDetector: newMLFileDetector(), + shutdown: make(chan struct{}), + done: make(chan struct{}), + } + + // Start background cleanup + go ofc.cleanupWorker() + + glog.V(1).Infof("OpenFileCache initialized: maxFiles=%d, ttl=%v", maxFiles, ttl) + return ofc +} + +// newMLFileDetector creates a new ML file detector with common patterns +func newMLFileDetector() *MLFileDetector { + return &MLFileDetector{ + datasetExtensions: map[string]bool{ + "jpg": true, "jpeg": true, "png": true, "bmp": true, "tiff": true, + "wav": true, "mp3": true, "flac": true, + "txt": true, "csv": true, "json": true, "jsonl": true, + "parquet": true, "arrow": true, "h5": true, "hdf5": true, + "tfrecord": true, "tfrecords": true, + }, + modelExtensions: map[string]bool{ + "pt": true, "pth": true, "pkl": true, "pickle": true, + "h5": true, "hdf5": true, "pb": true, "pbtxt": true, + "onnx": true, "tflite": true, "caffemodel": true, + "bin": true, "safetensors": true, + }, + configExtensions: map[string]bool{ + "yaml": true, "yml": true, "json": true, "toml": true, + "cfg": true, "config": true, "conf": true, + }, + datasetPaths: []string{ + "/datasets", "/data", "/train", "/test", "/val", "/validation", + "/images", "/audio", "/text", "/corpus", + }, + modelPaths: []string{ + "/models", "/checkpoints", "/weights", "/pretrained", + "/saved_models", "/exports", + }, + modelMinSize: 1024 * 1024, // 1MB minimum for model files + datasetMaxItems: 1000000, // 1M max items in dataset directory + } +} + +// OpenFile registers a file as opened and initializes tracking +func (ofc *OpenFileCache) OpenFile(inode uint64, entry *filer_pb.Entry, fullPath string) *OpenFileInfo { + ofc.Lock() + defer ofc.Unlock() + + // Get or create file info + fileInfo := ofc.files[inode] + if fileInfo == nil { + fileInfo = &OpenFileInfo{ + Inode: inode, + Entry: entry, + OpenTime: time.Now(), + ChunkCache: make(map[uint32]*ChunkMetadata), + AccessInfo: &AccessInfo{Inode: inode}, + ReadPattern: RandomAccess, + PrefetchState: PrefetchIdle, + } + + // Detect ML file type + if ofc.enableMLOptimization { + fileInfo.IsMLFile, fileInfo.FileType = ofc.mlFileDetector.DetectMLFile(entry, fullPath) + if fileInfo.IsMLFile { + glog.V(3).Infof("ML file detected: inode=%d, type=%v, path=%s", + inode, fileInfo.FileType, fullPath) + } + } + + ofc.files[inode] = fileInfo + ofc.totalFiles++ + + // Update access order for LRU + ofc.updateAccessOrder(inode) + + // Evict if necessary + if len(ofc.files) > ofc.maxFiles { + ofc.evictLRU() + } + } + + fileInfo.OpenCount++ + fileInfo.LastAccess = time.Now() + ofc.updateAccessOrder(inode) + + glog.V(4).Infof("File opened: inode=%d, openCount=%d, isML=%v", + inode, fileInfo.OpenCount, fileInfo.IsMLFile) + + return fileInfo +} + +// CloseFile decrements the open count and potentially cleans up +func (ofc *OpenFileCache) CloseFile(inode uint64) bool { + ofc.Lock() + defer ofc.Unlock() + + fileInfo := ofc.files[inode] + if fileInfo == nil { + return true // Already cleaned up + } + + fileInfo.OpenCount-- + glog.V(4).Infof("File closed: inode=%d, openCount=%d", inode, fileInfo.OpenCount) + + // Return true if file can be evicted (no more open handles) + return fileInfo.OpenCount <= 0 +} + +// GetFileInfo retrieves file information if cached +func (ofc *OpenFileCache) GetFileInfo(inode uint64) *OpenFileInfo { + ofc.RLock() + defer ofc.RUnlock() + + fileInfo := ofc.files[inode] + if fileInfo != nil { + fileInfo.LastAccess = time.Now() + ofc.cacheHits++ + return fileInfo + } + + ofc.cacheMisses++ + return nil +} + +// UpdateChunkCache updates chunk metadata for a file +func (ofc *OpenFileCache) UpdateChunkCache(inode uint64, chunkIndex uint32, metadata *ChunkMetadata) { + ofc.RLock() + fileInfo := ofc.files[inode] + ofc.RUnlock() + + if fileInfo == nil { + return + } + + fileInfo.Lock() + defer fileInfo.Unlock() + + fileInfo.ChunkCache[chunkIndex] = metadata + metadata.LastAccess = time.Now() + metadata.AccessCount++ + + glog.V(4).Infof("Updated chunk cache: inode=%d, chunk=%d, level=%d", + inode, chunkIndex, metadata.CacheLevel) +} + +// GetChunkMetadata retrieves chunk metadata if available +func (ofc *OpenFileCache) GetChunkMetadata(inode uint64, chunkIndex uint32) (*ChunkMetadata, bool) { + ofc.RLock() + fileInfo := ofc.files[inode] + ofc.RUnlock() + + if fileInfo == nil { + return nil, false + } + + fileInfo.RLock() + defer fileInfo.RUnlock() + + metadata, exists := fileInfo.ChunkCache[chunkIndex] + if exists { + metadata.LastAccess = time.Now() + metadata.AccessCount++ + } + + return metadata, exists +} + +// updateAccessOrder updates the LRU access order +func (ofc *OpenFileCache) updateAccessOrder(inode uint64) { + // Remove from current position + for i, ino := range ofc.accessOrder { + if ino == inode { + ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) + break + } + } + + // Add to front (most recently used) + ofc.accessOrder = append([]uint64{inode}, ofc.accessOrder...) +} + +// evictLRU evicts the least recently used file +func (ofc *OpenFileCache) evictLRU() { + if len(ofc.accessOrder) == 0 { + return + } + + // Find LRU file that can be evicted (not currently open) + for i := len(ofc.accessOrder) - 1; i >= 0; i-- { + inode := ofc.accessOrder[i] + fileInfo := ofc.files[inode] + + if fileInfo != nil && fileInfo.OpenCount <= 0 { + // Evict this file + delete(ofc.files, inode) + ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) + ofc.evictedFiles++ + + glog.V(3).Infof("Evicted file from cache: inode=%d, chunks=%d", + inode, len(fileInfo.ChunkCache)) + return + } + } + + // If no files can be evicted, just log a warning + glog.V(2).Infof("Warning: Could not evict any files from cache (all files are open)") +} + +// cleanupWorker periodically cleans up expired entries +func (ofc *OpenFileCache) cleanupWorker() { + ticker := time.NewTicker(ofc.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + ofc.cleanup() + case <-ofc.shutdown: + close(ofc.done) + return + } + } +} + +// cleanup removes expired file entries +func (ofc *OpenFileCache) cleanup() { + ofc.Lock() + defer ofc.Unlock() + + now := time.Now() + toRemove := make([]uint64, 0) + + for inode, fileInfo := range ofc.files { + // Only cleanup files that are not open and have expired + if fileInfo.OpenCount <= 0 && now.Sub(fileInfo.LastAccess) > ofc.ttl { + toRemove = append(toRemove, inode) + } + } + + // Remove expired files + for _, inode := range toRemove { + delete(ofc.files, inode) + // Remove from access order + for i, ino := range ofc.accessOrder { + if ino == inode { + ofc.accessOrder = append(ofc.accessOrder[:i], ofc.accessOrder[i+1:]...) + break + } + } + } + + if len(toRemove) > 0 { + glog.V(3).Infof("Cleaned up %d expired file cache entries", len(toRemove)) + } +} + +// GetMetrics returns cache metrics +func (ofc *OpenFileCache) GetMetrics() OpenFileCacheMetrics { + ofc.RLock() + defer ofc.RUnlock() + + var totalChunks int64 + var mlFiles int64 + fileTypes := make(map[MLFileType]int) + patterns := make(map[AccessPattern]int) + + for _, fileInfo := range ofc.files { + totalChunks += int64(len(fileInfo.ChunkCache)) + if fileInfo.IsMLFile { + mlFiles++ + fileTypes[fileInfo.FileType]++ + } + patterns[fileInfo.ReadPattern]++ + } + + return OpenFileCacheMetrics{ + TotalFiles: int64(len(ofc.files)), + MLFiles: mlFiles, + TotalChunks: totalChunks, + CacheHits: ofc.cacheHits, + CacheMisses: ofc.cacheMisses, + EvictedFiles: ofc.evictedFiles, + FileTypes: fileTypes, + AccessPatterns: patterns, + } +} + +// OpenFileCacheMetrics holds metrics for the open file cache +type OpenFileCacheMetrics struct { + TotalFiles int64 `json:"total_files"` + MLFiles int64 `json:"ml_files"` + TotalChunks int64 `json:"total_chunks"` + CacheHits int64 `json:"cache_hits"` + CacheMisses int64 `json:"cache_misses"` + EvictedFiles int64 `json:"evicted_files"` + FileTypes map[MLFileType]int `json:"file_types"` + AccessPatterns map[AccessPattern]int `json:"access_patterns"` +} + +// Shutdown gracefully shuts down the open file cache +func (ofc *OpenFileCache) Shutdown() { + glog.V(1).Infof("Shutting down OpenFileCache...") + + close(ofc.shutdown) + + // Wait for cleanup worker to finish + <-ofc.done + + // Print final metrics + metrics := ofc.GetMetrics() + glog.V(1).Infof("OpenFileCache final metrics: files=%d, chunks=%d, hits=%d, misses=%d", + metrics.TotalFiles, metrics.TotalChunks, metrics.CacheHits, metrics.CacheMisses) +} + +// MLFileDetector methods + +// DetectMLFile determines if a file is ML-related and its type +func (detector *MLFileDetector) DetectMLFile(entry *filer_pb.Entry, fullPath string) (bool, MLFileType) { + if entry == nil { + return false, MLFileUnknown + } + + name := entry.Name + size := int64(entry.Attributes.FileSize) + + // Check file extension + if ext := getFileExtension(name); ext != "" { + if detector.datasetExtensions[ext] { + return true, MLFileDataset + } + if detector.modelExtensions[ext] { + return true, MLFileModel + } + if detector.configExtensions[ext] { + return true, MLFileConfig + } + } + + // Check path patterns + for _, path := range detector.datasetPaths { + if contains(fullPath, path) { + return true, MLFileDataset + } + } + + for _, path := range detector.modelPaths { + if contains(fullPath, path) { + return true, MLFileModel + } + } + + // Check size heuristics + if size > detector.modelMinSize { + // Large files in certain contexts might be models + if contains(fullPath, "model") || contains(fullPath, "checkpoint") || contains(fullPath, "weight") { + return true, MLFileModel + } + } + + // Check for tensor files + if contains(name, "tensor") || contains(name, ".pt") || contains(name, ".npy") { + return true, MLFileTensor + } + + // Check for log files + if contains(name, "log") || contains(name, "tensorboard") || contains(fullPath, "logs") { + return true, MLFileLog + } + + return false, MLFileUnknown +} + +// Helper functions + +func getFileExtension(filename string) string { + for i := len(filename) - 1; i >= 0; i-- { + if filename[i] == '.' { + return filename[i+1:] + } + } + return "" +} + +func contains(str, substr string) bool { + return len(str) >= len(substr) && findSubstring(str, substr) +} + +func findSubstring(str, substr string) bool { + if len(substr) == 0 { + return true + } + if len(str) < len(substr) { + return false + } + + for i := 0; i <= len(str)-len(substr); i++ { + if str[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// String methods for enums + +func (ps PrefetchState) String() string { + switch ps { + case PrefetchIdle: + return "Idle" + case PrefetchActive: + return "Active" + case PrefetchComplete: + return "Complete" + case PrefetchSuspended: + return "Suspended" + default: + return "Unknown" + } +} + +func (ft MLFileType) String() string { + switch ft { + case MLFileDataset: + return "Dataset" + case MLFileModel: + return "Model" + case MLFileConfig: + return "Config" + case MLFileTensor: + return "Tensor" + case MLFileLog: + return "Log" + default: + return "Unknown" + } +} |
