aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/ml/prefetch.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/ml/prefetch.go')
-rw-r--r--weed/mount/ml/prefetch.go349
1 files changed, 349 insertions, 0 deletions
diff --git a/weed/mount/ml/prefetch.go b/weed/mount/ml/prefetch.go
new file mode 100644
index 000000000..92fc5e2ec
--- /dev/null
+++ b/weed/mount/ml/prefetch.go
@@ -0,0 +1,349 @@
+package ml
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// PrefetchRequest represents a chunk prefetch request
+type PrefetchRequest struct {
+ FileId string
+ ChunkIndex uint32
+ Offset uint64
+ Size uint64
+ Priority int
+ Timestamp time.Time
+ Callback func([]byte, error)
+ ctx context.Context
+}
+
+// PrefetchJob tracks an active prefetch operation
+type PrefetchJob struct {
+ request *PrefetchRequest
+ startTime time.Time
+ cancelled int32
+}
+
+// PrefetchManager manages background chunk prefetching for ML workloads
+type PrefetchManager struct {
+ sync.RWMutex
+
+ // Configuration
+ maxWorkers int
+ queueSize int
+ jobTimeout time.Duration
+ enableMetrics bool
+
+ // Worker management
+ workers chan *PrefetchRequest
+ activeJobs map[string]*PrefetchJob
+ workerWg sync.WaitGroup
+
+ // Metrics
+ totalRequests int64
+ successfulFetch int64
+ failedFetch int64
+ duplicateReqs int64
+ timeoutReqs int64
+
+ // Shutdown
+ shutdown chan struct{}
+ done chan struct{}
+}
+
+// NewPrefetchManager creates a new prefetch manager optimized for ML workloads
+func NewPrefetchManager(maxWorkers int, queueSize int, timeout time.Duration) *PrefetchManager {
+ if maxWorkers <= 0 {
+ maxWorkers = 4 // Default suitable for ML workloads
+ }
+ if queueSize <= 0 {
+ queueSize = 100
+ }
+ if timeout <= 0 {
+ timeout = 30 * time.Second
+ }
+
+ pm := &PrefetchManager{
+ maxWorkers: maxWorkers,
+ queueSize: queueSize,
+ jobTimeout: timeout,
+ enableMetrics: true,
+ workers: make(chan *PrefetchRequest, queueSize),
+ activeJobs: make(map[string]*PrefetchJob),
+ shutdown: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+
+ // Start worker goroutines
+ for i := 0; i < maxWorkers; i++ {
+ pm.workerWg.Add(1)
+ go pm.worker(i)
+ }
+
+ // Start cleanup goroutine for expired jobs
+ go pm.cleanupWorker()
+
+ glog.V(1).Infof("PrefetchManager started with %d workers, queue size %d", maxWorkers, queueSize)
+ return pm
+}
+
+// Prefetch requests background fetching of a chunk
+// Returns true if request was queued, false if duplicate or queue full
+func (pm *PrefetchManager) Prefetch(ctx context.Context, fileId string, chunkIndex uint32, offset, size uint64, priority int, callback func([]byte, error)) bool {
+ atomic.AddInt64(&pm.totalRequests, 1)
+
+ // Create job key for deduplication
+ jobKey := pm.makeJobKey(fileId, chunkIndex)
+
+ pm.Lock()
+ // Check for duplicate requests
+ if _, exists := pm.activeJobs[jobKey]; exists {
+ pm.Unlock()
+ atomic.AddInt64(&pm.duplicateReqs, 1)
+ glog.V(4).Infof("Duplicate prefetch request for %s chunk %d", fileId, chunkIndex)
+ return false
+ }
+
+ request := &PrefetchRequest{
+ FileId: fileId,
+ ChunkIndex: chunkIndex,
+ Offset: offset,
+ Size: size,
+ Priority: priority,
+ Timestamp: time.Now(),
+ Callback: callback,
+ ctx: ctx,
+ }
+
+ job := &PrefetchJob{
+ request: request,
+ startTime: time.Now(),
+ }
+
+ pm.activeJobs[jobKey] = job
+ pm.Unlock()
+
+ // Try to queue the request
+ select {
+ case pm.workers <- request:
+ glog.V(4).Infof("Queued prefetch for %s chunk %d (priority %d)", fileId, chunkIndex, priority)
+ return true
+ default:
+ // Queue is full, remove from active jobs
+ pm.Lock()
+ delete(pm.activeJobs, jobKey)
+ pm.Unlock()
+ glog.V(3).Infof("Prefetch queue full, dropping request for %s chunk %d", fileId, chunkIndex)
+ return false
+ }
+}
+
+// worker processes prefetch requests
+func (pm *PrefetchManager) worker(workerID int) {
+ defer pm.workerWg.Done()
+
+ glog.V(4).Infof("Prefetch worker %d started", workerID)
+
+ for {
+ select {
+ case request := <-pm.workers:
+ pm.processRequest(workerID, request)
+ case <-pm.shutdown:
+ glog.V(4).Infof("Prefetch worker %d shutting down", workerID)
+ return
+ }
+ }
+}
+
+// processRequest handles a single prefetch request
+func (pm *PrefetchManager) processRequest(workerID int, request *PrefetchRequest) {
+ jobKey := pm.makeJobKey(request.FileId, request.ChunkIndex)
+ startTime := time.Now()
+
+ glog.V(4).Infof("Worker %d processing prefetch for %s chunk %d", workerID, request.FileId, request.ChunkIndex)
+
+ // Check if job was cancelled
+ pm.RLock()
+ job, exists := pm.activeJobs[jobKey]
+ pm.RUnlock()
+
+ if !exists {
+ glog.V(4).Infof("Job %s already cancelled or completed", jobKey)
+ return
+ }
+
+ if atomic.LoadInt32(&job.cancelled) == 1 {
+ glog.V(4).Infof("Job %s was cancelled", jobKey)
+ pm.removeJob(jobKey)
+ return
+ }
+
+ // Create timeout context
+ ctx, cancel := context.WithTimeout(request.ctx, pm.jobTimeout)
+ defer cancel()
+
+ // TODO: Implement actual chunk fetching logic
+ // For now, simulate the work and call the callback
+ data, err := pm.fetchChunk(ctx, request)
+
+ // Update metrics
+ duration := time.Since(startTime)
+ if err != nil {
+ atomic.AddInt64(&pm.failedFetch, 1)
+ if ctx.Err() == context.DeadlineExceeded {
+ atomic.AddInt64(&pm.timeoutReqs, 1)
+ }
+ glog.V(3).Infof("Worker %d failed to prefetch %s chunk %d after %v: %v", workerID, request.FileId, request.ChunkIndex, duration, err)
+ } else {
+ atomic.AddInt64(&pm.successfulFetch, 1)
+ glog.V(4).Infof("Worker %d successfully prefetched %s chunk %d in %v (%d bytes)", workerID, request.FileId, request.ChunkIndex, duration, len(data))
+ }
+
+ // Call the callback if provided
+ if request.Callback != nil {
+ request.Callback(data, err)
+ }
+
+ // Remove job from active jobs
+ pm.removeJob(jobKey)
+}
+
+// fetchChunk performs the actual chunk fetch operation
+// TODO: Integrate with existing SeaweedFS chunk reading logic
+func (pm *PrefetchManager) fetchChunk(ctx context.Context, request *PrefetchRequest) ([]byte, error) {
+ // This is a placeholder implementation
+ // In the real implementation, this would:
+ // 1. Use the existing chunk cache to check if chunk is already cached
+ // 2. If not cached, fetch from volume servers using existing logic
+ // 3. Store in cache for future use
+
+ glog.V(4).Infof("Simulating fetch of %s chunk %d (offset %d, size %d)",
+ request.FileId, request.ChunkIndex, request.Offset, request.Size)
+
+ // Simulate some work
+ select {
+ case <-time.After(10 * time.Millisecond):
+ // Return empty data for now
+ return make([]byte, request.Size), nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
+
+// Cancel cancels a pending or active prefetch request
+func (pm *PrefetchManager) Cancel(fileId string, chunkIndex uint32) bool {
+ jobKey := pm.makeJobKey(fileId, chunkIndex)
+
+ pm.RLock()
+ job, exists := pm.activeJobs[jobKey]
+ pm.RUnlock()
+
+ if !exists {
+ return false
+ }
+
+ atomic.StoreInt32(&job.cancelled, 1)
+ glog.V(4).Infof("Cancelled prefetch for %s chunk %d", fileId, chunkIndex)
+ return true
+}
+
+// cleanupWorker periodically removes expired jobs
+func (pm *PrefetchManager) cleanupWorker() {
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ pm.cleanup()
+ case <-pm.shutdown:
+ return
+ }
+ }
+}
+
+// cleanup removes expired jobs
+func (pm *PrefetchManager) cleanup() {
+ now := time.Now()
+ expiredJobKeys := make([]string, 0)
+
+ pm.RLock()
+ for jobKey, job := range pm.activeJobs {
+ if now.Sub(job.startTime) > pm.jobTimeout*2 { // Give extra time for cleanup
+ expiredJobKeys = append(expiredJobKeys, jobKey)
+ }
+ }
+ pm.RUnlock()
+
+ if len(expiredJobKeys) > 0 {
+ pm.Lock()
+ for _, jobKey := range expiredJobKeys {
+ delete(pm.activeJobs, jobKey)
+ }
+ pm.Unlock()
+
+ glog.V(3).Infof("Cleaned up %d expired prefetch jobs", len(expiredJobKeys))
+ }
+}
+
+// GetMetrics returns current prefetch metrics
+func (pm *PrefetchManager) GetMetrics() PrefetchMetrics {
+ pm.RLock()
+ activeJobCount := len(pm.activeJobs)
+ pm.RUnlock()
+
+ return PrefetchMetrics{
+ TotalRequests: atomic.LoadInt64(&pm.totalRequests),
+ SuccessfulFetch: atomic.LoadInt64(&pm.successfulFetch),
+ FailedFetch: atomic.LoadInt64(&pm.failedFetch),
+ DuplicateReqs: atomic.LoadInt64(&pm.duplicateReqs),
+ TimeoutReqs: atomic.LoadInt64(&pm.timeoutReqs),
+ ActiveJobs: int64(activeJobCount),
+ Workers: int64(pm.maxWorkers),
+ }
+}
+
+// PrefetchMetrics holds prefetch performance metrics
+type PrefetchMetrics struct {
+ TotalRequests int64
+ SuccessfulFetch int64
+ FailedFetch int64
+ DuplicateReqs int64
+ TimeoutReqs int64
+ ActiveJobs int64
+ Workers int64
+}
+
+// Shutdown gracefully shuts down the prefetch manager
+func (pm *PrefetchManager) Shutdown() {
+ glog.V(1).Infof("Shutting down PrefetchManager...")
+
+ close(pm.shutdown)
+
+ // Wait for workers to finish
+ pm.workerWg.Wait()
+
+ // Clear active jobs
+ pm.Lock()
+ pm.activeJobs = make(map[string]*PrefetchJob)
+ pm.Unlock()
+
+ close(pm.done)
+ glog.V(1).Infof("PrefetchManager shutdown complete")
+}
+
+// Helper methods
+
+func (pm *PrefetchManager) makeJobKey(fileId string, chunkIndex uint32) string {
+ return fileId + ":" + string(rune(chunkIndex))
+}
+
+func (pm *PrefetchManager) removeJob(jobKey string) {
+ pm.Lock()
+ delete(pm.activeJobs, jobKey)
+ pm.Unlock()
+}