aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer.go5
-rw-r--r--weed/filer/filer_deletion.go572
-rw-r--r--weed/filer/filer_deletion_test.go308
3 files changed, 840 insertions, 45 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 71185d3d1..b86ac3c5b 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -54,6 +54,8 @@ type Filer struct {
RemoteStorage *FilerRemoteStorage
Dlm *lock_manager.DistributedLockManager
MaxFilenameLength uint32
+ deletionQuit chan struct{}
+ DeletionRetryQueue *DeletionRetryQueue
}
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer {
@@ -66,6 +68,8 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
UniqueFilerId: util.RandomInt32(),
Dlm: lock_manager.NewDistributedLockManager(filerHost),
MaxFilenameLength: maxFilenameLength,
+ deletionQuit: make(chan struct{}),
+ DeletionRetryQueue: NewDeletionRetryQueue(),
}
if f.UniqueFilerId < 0 {
f.UniqueFilerId = -f.UniqueFilerId
@@ -379,6 +383,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
}
func (f *Filer) Shutdown() {
+ close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer()
f.Store.Shutdown()
}
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index b3a4296ba..d8bc105e6 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -1,20 +1,274 @@
package filer
import (
+ "container/heap"
"context"
"fmt"
"strings"
+ "sync"
"time"
+ "google.golang.org/grpc"
+
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
+const (
+ // Maximum number of retry attempts for failed deletions
+ MaxRetryAttempts = 10
+ // Initial retry delay (will be doubled with each attempt)
+ InitialRetryDelay = 5 * time.Minute
+ // Maximum retry delay
+ MaxRetryDelay = 6 * time.Hour
+ // Interval for checking retry queue for ready items
+ DeletionRetryPollInterval = 1 * time.Minute
+ // Maximum number of items to process per retry iteration
+ DeletionRetryBatchSize = 1000
+ // Maximum number of error details to include in log messages
+ MaxLoggedErrorDetails = 10
+ // Interval for polling the deletion queue for new items
+ // Using a prime number to de-synchronize with other periodic tasks
+ DeletionPollInterval = 1123 * time.Millisecond
+ // Maximum number of file IDs to delete per batch (roughly 20 bytes per file ID)
+ DeletionBatchSize = 100000
+)
+
+// retryablePatterns contains error message patterns that indicate temporary/transient conditions
+// that should be retried. These patterns are based on actual error messages from the deletion pipeline.
+var retryablePatterns = []string{
+ "is read only", // Volume temporarily read-only (tiering, maintenance)
+ "error reading from server", // Network I/O errors
+ "connection reset by peer", // Network connection issues
+ "closed network connection", // Network connection closed unexpectedly
+ "connection refused", // Server temporarily unavailable
+ "timeout", // Operation timeout (network or server)
+ "deadline exceeded", // Context deadline exceeded
+ "context canceled", // Context cancellation (may be transient)
+ "lookup error", // Volume lookup failures
+ "lookup failed", // Volume server discovery issues
+ "too many requests", // Rate limiting / backpressure
+ "service unavailable", // HTTP 503 errors
+ "temporarily unavailable", // Temporary service issues
+ "try again", // Explicit retry suggestion
+ "i/o timeout", // Network I/O timeout
+ "broken pipe", // Connection broken during operation
+}
+
+// DeletionRetryItem represents a file deletion that failed and needs to be retried
+type DeletionRetryItem struct {
+ FileId string
+ RetryCount int
+ NextRetryAt time.Time
+ LastError string
+ heapIndex int // index in the heap (for heap.Interface)
+ inFlight bool // true when item is being processed, prevents duplicate additions
+}
+
+// retryHeap implements heap.Interface for DeletionRetryItem
+// Items are ordered by NextRetryAt (earliest first)
+type retryHeap []*DeletionRetryItem
+
+// Compile-time assertion that retryHeap implements heap.Interface
+var _ heap.Interface = (*retryHeap)(nil)
+
+func (h retryHeap) Len() int { return len(h) }
+
+func (h retryHeap) Less(i, j int) bool {
+ return h[i].NextRetryAt.Before(h[j].NextRetryAt)
+}
+
+func (h retryHeap) Swap(i, j int) {
+ h[i], h[j] = h[j], h[i]
+ h[i].heapIndex = i
+ h[j].heapIndex = j
+}
+
+func (h *retryHeap) Push(x any) {
+ item := x.(*DeletionRetryItem)
+ item.heapIndex = len(*h)
+ *h = append(*h, item)
+}
+
+func (h *retryHeap) Pop() any {
+ old := *h
+ n := len(old)
+ item := old[n-1]
+ old[n-1] = nil // avoid memory leak
+ item.heapIndex = -1 // mark as removed
+ *h = old[0 : n-1]
+ return item
+}
+
+// DeletionRetryQueue manages the queue of failed deletions that need to be retried.
+// Uses a min-heap ordered by NextRetryAt for efficient retrieval of ready items.
+//
+// LIMITATION: Current implementation stores retry queue in memory only.
+// On filer restart, all pending retries are lost. With MaxRetryDelay up to 6 hours,
+// process restarts during this window will cause retry state loss.
+//
+// TODO: Consider persisting retry queue to durable storage for production resilience:
+// - Option 1: Leverage existing Filer store (KV operations)
+// - Option 2: Periodic snapshots to disk with recovery on startup
+// - Option 3: Write-ahead log for retry queue mutations
+// - Trade-offs: Performance vs durability, complexity vs reliability
+//
+// For now, accepting in-memory storage as pragmatic initial implementation.
+// Lost retries will be eventually consistent as files remain in deletion queue.
+type DeletionRetryQueue struct {
+ heap retryHeap
+ itemIndex map[string]*DeletionRetryItem // for O(1) lookup by FileId
+ lock sync.Mutex
+}
+
+// NewDeletionRetryQueue creates a new retry queue
+func NewDeletionRetryQueue() *DeletionRetryQueue {
+ q := &DeletionRetryQueue{
+ heap: make(retryHeap, 0),
+ itemIndex: make(map[string]*DeletionRetryItem),
+ }
+ heap.Init(&q.heap)
+ return q
+}
+
+// calculateBackoff calculates the exponential backoff delay for a given retry count.
+// Uses exponential backoff formula: InitialRetryDelay * 2^(retryCount-1)
+// The first retry (retryCount=1) uses InitialRetryDelay, second uses 2x, third uses 4x, etc.
+// Includes overflow protection and caps at MaxRetryDelay.
+func calculateBackoff(retryCount int) time.Duration {
+ // The first retry is attempt 1, but shift should start at 0
+ if retryCount <= 1 {
+ return InitialRetryDelay
+ }
+
+ shiftAmount := uint(retryCount - 1)
+
+ // time.Duration is an int64. A left shift of 63 or more will result in a
+ // negative number or zero. The multiplication can also overflow much earlier
+ // (around a shift of 25 for a 5-minute initial delay).
+ // The `delay <= 0` check below correctly catches all these overflow cases.
+ delay := InitialRetryDelay << shiftAmount
+
+ if delay <= 0 || delay > MaxRetryDelay {
+ return MaxRetryDelay
+ }
+
+ return delay
+}
+
+// AddOrUpdate adds a new failed deletion or updates an existing one
+// Time complexity: O(log N) for insertion/update
+func (q *DeletionRetryQueue) AddOrUpdate(fileId string, errorMsg string) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Check if item already exists (including in-flight items)
+ if item, exists := q.itemIndex[fileId]; exists {
+ // Item is already in the queue or being processed. Just update the error.
+ // The existing retry schedule should proceed.
+ // RetryCount is only incremented in RequeueForRetry when an actual retry is performed.
+ item.LastError = errorMsg
+ if item.inFlight {
+ glog.V(2).Infof("retry for %s in-flight: attempt %d, will preserve retry state", fileId, item.RetryCount)
+ } else {
+ glog.V(2).Infof("retry for %s already scheduled: attempt %d, next retry in %v", fileId, item.RetryCount, time.Until(item.NextRetryAt))
+ }
+ return
+ }
+
+ // Add new item
+ delay := InitialRetryDelay
+ item := &DeletionRetryItem{
+ FileId: fileId,
+ RetryCount: 1,
+ NextRetryAt: time.Now().Add(delay),
+ LastError: errorMsg,
+ inFlight: false,
+ }
+ heap.Push(&q.heap, item)
+ q.itemIndex[fileId] = item
+ glog.V(2).Infof("added retry for %s: next retry in %v", fileId, delay)
+}
+
+// RequeueForRetry re-adds a previously failed item back to the queue with incremented retry count.
+// This method MUST be used when re-queuing items from processRetryBatch to preserve retry state.
+// Time complexity: O(log N) for insertion
+func (q *DeletionRetryQueue) RequeueForRetry(item *DeletionRetryItem, errorMsg string) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Increment retry count
+ item.RetryCount++
+ item.LastError = errorMsg
+
+ // Calculate next retry time with exponential backoff
+ delay := calculateBackoff(item.RetryCount)
+ item.NextRetryAt = time.Now().Add(delay)
+ item.inFlight = false // Clear in-flight flag
+ glog.V(2).Infof("requeued retry for %s: attempt %d, next retry in %v", item.FileId, item.RetryCount, delay)
+
+ // Re-add to heap (item still in itemIndex)
+ heap.Push(&q.heap, item)
+}
+
+// GetReadyItems returns items that are ready to be retried and marks them as in-flight
+// Time complexity: O(K log N) where K is the number of ready items
+// Items are processed in order of NextRetryAt (earliest first)
+func (q *DeletionRetryQueue) GetReadyItems(maxItems int) []*DeletionRetryItem {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ now := time.Now()
+ var readyItems []*DeletionRetryItem
+
+ // Peek at items from the top of the heap (earliest NextRetryAt)
+ for len(q.heap) > 0 && len(readyItems) < maxItems {
+ item := q.heap[0]
+
+ // If the earliest item is not ready yet, no other items are ready either
+ if item.NextRetryAt.After(now) {
+ break
+ }
+
+ // Remove from heap but keep in itemIndex with inFlight flag
+ heap.Pop(&q.heap)
+
+ if item.RetryCount <= MaxRetryAttempts {
+ item.inFlight = true // Mark as being processed
+ readyItems = append(readyItems, item)
+ } else {
+ // Max attempts reached, log and discard completely
+ delete(q.itemIndex, item.FileId)
+ glog.Warningf("max retry attempts (%d) reached for %s, last error: %s", MaxRetryAttempts, item.FileId, item.LastError)
+ }
+ }
+
+ return readyItems
+}
+
+// Remove removes an item from the queue (called when deletion succeeds or fails permanently)
+// Time complexity: O(1)
+func (q *DeletionRetryQueue) Remove(item *DeletionRetryItem) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ // Item was already removed from heap by GetReadyItems, just remove from index
+ delete(q.itemIndex, item.FileId)
+}
+
+// Size returns the current size of the retry queue
+func (q *DeletionRetryQueue) Size() int {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+ return len(q.heap)
+}
+
func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]*operation.LookupResult, error) {
return func(vids []string) (map[string]*operation.LookupResult, error) {
m := make(map[string]*operation.LookupResult)
@@ -41,64 +295,292 @@ func (f *Filer) loopProcessingDeletion() {
lookupFunc := LookupByMasterClientFn(f.MasterClient)
- DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
+ // Start retry processor in a separate goroutine
+ go f.loopProcessingDeletionRetry(lookupFunc)
+
+ ticker := time.NewTicker(DeletionPollInterval)
+ defer ticker.Stop()
- var deletionCount int
for {
- deletionCount = 0
- f.fileIdDeletionQueue.Consume(func(fileIds []string) {
- for len(fileIds) > 0 {
- var toDeleteFileIds []string
- if len(fileIds) > DeletionBatchSize {
- toDeleteFileIds = fileIds[:DeletionBatchSize]
- fileIds = fileIds[DeletionBatchSize:]
- } else {
- toDeleteFileIds = fileIds
- fileIds = fileIds[:0]
- }
- deletionCount = len(toDeleteFileIds)
- results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
-
- // Process individual results for better error tracking
- var successCount, notFoundCount, errorCount int
- var errorDetails []string
-
- for _, result := range results {
- if result.Error == "" {
- successCount++
- } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
- // Already deleted - acceptable
- notFoundCount++
- } else {
- // Actual error
- errorCount++
- if errorCount <= 10 {
- // Only log first 10 errors to avoid flooding logs
- errorDetails = append(errorDetails, result.FileId+": "+result.Error)
- }
+ select {
+ case <-f.deletionQuit:
+ glog.V(0).Infof("deletion processor shutting down")
+ return
+ case <-ticker.C:
+ f.fileIdDeletionQueue.Consume(func(fileIds []string) {
+ for i := 0; i < len(fileIds); i += DeletionBatchSize {
+ end := i + DeletionBatchSize
+ if end > len(fileIds) {
+ end = len(fileIds)
}
+ toDeleteFileIds := fileIds[i:end]
+ f.processDeletionBatch(toDeleteFileIds, lookupFunc)
}
+ })
+ }
+ }
+}
- if successCount > 0 || notFoundCount > 0 {
- glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
- }
+// processDeletionBatch handles deletion of a batch of file IDs and processes results.
+// It classifies errors into retryable and permanent categories, adds retryable failures
+// to the retry queue, and logs appropriate messages.
+func (f *Filer) processDeletionBatch(toDeleteFileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
+ // Deduplicate file IDs to prevent incorrect retry count increments for the same file ID within a single batch.
+ uniqueFileIdsSlice := make([]string, 0, len(toDeleteFileIds))
+ processed := make(map[string]struct{}, len(toDeleteFileIds))
+ for _, fileId := range toDeleteFileIds {
+ if _, found := processed[fileId]; !found {
+ processed[fileId] = struct{}{}
+ uniqueFileIdsSlice = append(uniqueFileIdsSlice, fileId)
+ }
+ }
- if errorCount > 0 {
- logMessage := fmt.Sprintf("failed to delete %d/%d files", errorCount, len(toDeleteFileIds))
- if errorCount > 10 {
- logMessage += " (showing first 10)"
- }
- glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
+ if len(uniqueFileIdsSlice) == 0 {
+ return
+ }
+
+ // Delete files and classify outcomes
+ outcomes := deleteFilesAndClassify(f.GrpcDialOption, uniqueFileIdsSlice, lookupFunc)
+
+ // Process outcomes
+ var successCount, notFoundCount, retryableErrorCount, permanentErrorCount int
+ var errorDetails []string
+
+ for _, fileId := range uniqueFileIdsSlice {
+ outcome := outcomes[fileId]
+
+ switch outcome.status {
+ case deletionOutcomeSuccess:
+ successCount++
+ case deletionOutcomeNotFound:
+ notFoundCount++
+ case deletionOutcomeRetryable, deletionOutcomeNoResult:
+ retryableErrorCount++
+ f.DeletionRetryQueue.AddOrUpdate(fileId, outcome.errorMsg)
+ if len(errorDetails) < MaxLoggedErrorDetails {
+ errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (will retry)")
+ }
+ case deletionOutcomePermanent:
+ permanentErrorCount++
+ if len(errorDetails) < MaxLoggedErrorDetails {
+ errorDetails = append(errorDetails, fileId+": "+outcome.errorMsg+" (permanent)")
+ }
+ }
+ }
+
+ if successCount > 0 || notFoundCount > 0 {
+ glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
+ }
+
+ totalErrors := retryableErrorCount + permanentErrorCount
+ if totalErrors > 0 {
+ logMessage := fmt.Sprintf("failed to delete %d/%d files (%d retryable, %d permanent)",
+ totalErrors, len(uniqueFileIdsSlice), retryableErrorCount, permanentErrorCount)
+ if len(errorDetails) > 0 {
+ if totalErrors > MaxLoggedErrorDetails {
+ logMessage += fmt.Sprintf(" (showing first %d)", len(errorDetails))
+ }
+ glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
+ } else {
+ glog.V(0).Info(logMessage)
+ }
+ }
+
+ if f.DeletionRetryQueue.Size() > 0 {
+ glog.V(2).Infof("retry queue size: %d", f.DeletionRetryQueue.Size())
+ }
+}
+
+const (
+ deletionOutcomeSuccess = "success"
+ deletionOutcomeNotFound = "not_found"
+ deletionOutcomeRetryable = "retryable"
+ deletionOutcomePermanent = "permanent"
+ deletionOutcomeNoResult = "no_result"
+)
+
+// deletionOutcome represents the result of classifying deletion results for a file
+type deletionOutcome struct {
+ status string // One of the deletionOutcome* constants
+ errorMsg string
+}
+
+// deleteFilesAndClassify performs deletion and classifies outcomes for a list of file IDs
+func deleteFilesAndClassify(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) map[string]deletionOutcome {
+ // Perform deletion
+ results := operation.DeleteFileIdsWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
+
+ // Group results by file ID to handle multiple results for replicated volumes
+ resultsByFileId := make(map[string][]*volume_server_pb.DeleteResult)
+ for _, result := range results {
+ resultsByFileId[result.FileId] = append(resultsByFileId[result.FileId], result)
+ }
+
+ // Classify outcome for each file
+ outcomes := make(map[string]deletionOutcome, len(fileIds))
+ for _, fileId := range fileIds {
+ outcomes[fileId] = classifyDeletionOutcome(fileId, resultsByFileId)
+ }
+
+ return outcomes
+}
+
+// classifyDeletionOutcome examines all deletion results for a file ID and determines the overall outcome
+// Uses a single pass through results with early return for permanent errors (highest priority)
+// Priority: Permanent > Retryable > Success > Not Found
+func classifyDeletionOutcome(fileId string, resultsByFileId map[string][]*volume_server_pb.DeleteResult) deletionOutcome {
+ fileIdResults, found := resultsByFileId[fileId]
+ if !found || len(fileIdResults) == 0 {
+ return deletionOutcome{
+ status: deletionOutcomeNoResult,
+ errorMsg: "no deletion result from volume server",
+ }
+ }
+
+ var firstRetryableError string
+ hasSuccess := false
+
+ for _, res := range fileIdResults {
+ if res.Error == "" {
+ hasSuccess = true
+ continue
+ }
+ if strings.Contains(res.Error, storage.ErrorDeleted.Error()) || res.Error == "not found" {
+ continue
+ }
+
+ if isRetryableError(res.Error) {
+ if firstRetryableError == "" {
+ firstRetryableError = res.Error
+ }
+ } else {
+ // Permanent error takes highest precedence - return immediately
+ return deletionOutcome{status: deletionOutcomePermanent, errorMsg: res.Error}
+ }
+ }
+
+ if firstRetryableError != "" {
+ return deletionOutcome{status: deletionOutcomeRetryable, errorMsg: firstRetryableError}
+ }
+
+ if hasSuccess {
+ return deletionOutcome{status: deletionOutcomeSuccess, errorMsg: ""}
+ }
+
+ // If we are here, all results were "not found"
+ return deletionOutcome{status: deletionOutcomeNotFound, errorMsg: ""}
+}
+
+// isRetryableError determines if an error is retryable based on its message.
+//
+// Current implementation uses string matching which is brittle and may break
+// if error messages change in dependencies. This is acceptable for the initial
+// implementation but should be improved in the future.
+//
+// TODO: Consider these improvements for more robust error handling:
+// - Pass DeleteResult instead of just error string to access Status codes
+// - Use HTTP status codes (503 Service Unavailable, 429 Too Many Requests, etc.)
+// - Implement structured error types that can be checked with errors.Is/errors.As
+// - Extract and check gRPC status codes for better classification
+// - Add error wrapping in the deletion pipeline to preserve error context
+//
+// For now, we use conservative string matching for known transient error patterns.
+func isRetryableError(errorMsg string) bool {
+ // Empty errors are not retryable
+ if errorMsg == "" {
+ return false
+ }
+
+ errorLower := strings.ToLower(errorMsg)
+ for _, pattern := range retryablePatterns {
+ if strings.Contains(errorLower, pattern) {
+ return true
+ }
+ }
+ return false
+}
+
+// loopProcessingDeletionRetry processes the retry queue for failed deletions
+func (f *Filer) loopProcessingDeletionRetry(lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
+
+ ticker := time.NewTicker(DeletionRetryPollInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-f.deletionQuit:
+ glog.V(0).Infof("retry processor shutting down, %d items remaining in queue", f.DeletionRetryQueue.Size())
+ return
+ case <-ticker.C:
+ // Process all ready items in batches until queue is empty
+ totalProcessed := 0
+ for {
+ readyItems := f.DeletionRetryQueue.GetReadyItems(DeletionRetryBatchSize)
+ if len(readyItems) == 0 {
+ break
}
+
+ f.processRetryBatch(readyItems, lookupFunc)
+ totalProcessed += len(readyItems)
}
- })
- if deletionCount == 0 {
- time.Sleep(1123 * time.Millisecond)
+ if totalProcessed > 0 {
+ glog.V(1).Infof("retried deletion of %d files", totalProcessed)
+ }
}
}
}
+// processRetryBatch attempts to retry deletion of files and processes results.
+// Successfully deleted items are removed from tracking, retryable failures are
+// re-queued with updated retry counts, and permanent errors are logged and discarded.
+func (f *Filer) processRetryBatch(readyItems []*DeletionRetryItem, lookupFunc func([]string) (map[string]*operation.LookupResult, error)) {
+ // Extract file IDs from retry items
+ fileIds := make([]string, 0, len(readyItems))
+ for _, item := range readyItems {
+ fileIds = append(fileIds, item.FileId)
+ }
+
+ // Delete files and classify outcomes
+ outcomes := deleteFilesAndClassify(f.GrpcDialOption, fileIds, lookupFunc)
+
+ // Process outcomes - iterate over readyItems to ensure all items are accounted for
+ var successCount, notFoundCount, retryCount, permanentErrorCount int
+ for _, item := range readyItems {
+ outcome := outcomes[item.FileId]
+
+ switch outcome.status {
+ case deletionOutcomeSuccess:
+ successCount++
+ f.DeletionRetryQueue.Remove(item) // Remove from queue (success)
+ glog.V(2).Infof("retry successful for %s after %d attempts", item.FileId, item.RetryCount)
+ case deletionOutcomeNotFound:
+ notFoundCount++
+ f.DeletionRetryQueue.Remove(item) // Remove from queue (already deleted)
+ case deletionOutcomeRetryable, deletionOutcomeNoResult:
+ retryCount++
+ if outcome.status == deletionOutcomeNoResult {
+ glog.Warningf("no deletion result for retried file %s, re-queuing to avoid loss", item.FileId)
+ }
+ f.DeletionRetryQueue.RequeueForRetry(item, outcome.errorMsg)
+ case deletionOutcomePermanent:
+ permanentErrorCount++
+ f.DeletionRetryQueue.Remove(item) // Remove from queue (permanent failure)
+ glog.Warningf("permanent error on retry for %s after %d attempts: %s", item.FileId, item.RetryCount, outcome.errorMsg)
+ }
+ }
+
+ if successCount > 0 || notFoundCount > 0 {
+ glog.V(1).Infof("retry: deleted %d files successfully, %d already deleted", successCount, notFoundCount)
+ }
+ if retryCount > 0 {
+ glog.V(1).Infof("retry: %d files still failing, will retry again later", retryCount)
+ }
+ if permanentErrorCount > 0 {
+ glog.Warningf("retry: %d files failed with permanent errors", permanentErrorCount)
+ }
+}
+
func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
f.doDeleteChunks(ctx, chunks)
}
diff --git a/weed/filer/filer_deletion_test.go b/weed/filer/filer_deletion_test.go
new file mode 100644
index 000000000..77ac2310f
--- /dev/null
+++ b/weed/filer/filer_deletion_test.go
@@ -0,0 +1,308 @@
+package filer
+
+import (
+ "container/heap"
+ "testing"
+ "time"
+)
+
+func TestDeletionRetryQueue_AddAndRetrieve(t *testing.T) {
+ queue := NewDeletionRetryQueue()
+
+ // Add items
+ queue.AddOrUpdate("file1", "is read only")
+ queue.AddOrUpdate("file2", "connection reset")
+
+ if queue.Size() != 2 {
+ t.Errorf("Expected queue size 2, got %d", queue.Size())
+ }
+
+ // Items not ready yet (initial delay is 5 minutes)
+ readyItems := queue.GetReadyItems(10)
+ if len(readyItems) != 0 {
+ t.Errorf("Expected 0 ready items, got %d", len(readyItems))
+ }
+
+ // Size should remain unchanged
+ if queue.Size() != 2 {
+ t.Errorf("Expected queue size 2 after checking ready items, got %d", queue.Size())
+ }
+}
+
+func TestDeletionRetryQueue_ExponentialBackoff(t *testing.T) {
+ queue := NewDeletionRetryQueue()
+
+ // Create an item
+ item := &DeletionRetryItem{
+ FileId: "test-file",
+ RetryCount: 0,
+ NextRetryAt: time.Now(),
+ LastError: "test error",
+ }
+
+ // Requeue multiple times to test backoff
+ delays := []time.Duration{}
+
+ for i := 0; i < 5; i++ {
+ beforeTime := time.Now()
+ queue.RequeueForRetry(item, "error")
+
+ // Calculate expected delay for this retry count
+ expectedDelay := InitialRetryDelay * time.Duration(1<<uint(i))
+ if expectedDelay > MaxRetryDelay {
+ expectedDelay = MaxRetryDelay
+ }
+
+ // Verify NextRetryAt is approximately correct
+ actualDelay := item.NextRetryAt.Sub(beforeTime)
+ delays = append(delays, actualDelay)
+
+ // Allow small timing variance
+ timeDiff := actualDelay - expectedDelay
+ if timeDiff < 0 {
+ timeDiff = -timeDiff
+ }
+ if timeDiff > 100*time.Millisecond {
+ t.Errorf("Retry %d: expected delay ~%v, got %v (diff: %v)", i+1, expectedDelay, actualDelay, timeDiff)
+ }
+
+ // Verify retry count incremented
+ if item.RetryCount != i+1 {
+ t.Errorf("Expected RetryCount %d, got %d", i+1, item.RetryCount)
+ }
+
+ // Reset the heap for the next isolated test iteration
+ queue.lock.Lock()
+ queue.heap = retryHeap{}
+ queue.lock.Unlock()
+ }
+
+ t.Logf("Exponential backoff delays: %v", delays)
+}
+
+func TestDeletionRetryQueue_OverflowProtection(t *testing.T) {
+ queue := NewDeletionRetryQueue()
+
+ // Create an item with very high retry count
+ item := &DeletionRetryItem{
+ FileId: "test-file",
+ RetryCount: 60, // High count that would cause overflow without protection
+ NextRetryAt: time.Now(),
+ LastError: "test error",
+ }
+
+ // Should not panic and should cap at MaxRetryDelay
+ queue.RequeueForRetry(item, "error")
+
+ delay := time.Until(item.NextRetryAt)
+ if delay > MaxRetryDelay+time.Second {
+ t.Errorf("Delay exceeded MaxRetryDelay: %v > %v", delay, MaxRetryDelay)
+ }
+}
+
+func TestDeletionRetryQueue_MaxAttemptsReached(t *testing.T) {
+ queue := NewDeletionRetryQueue()
+
+ // Add item
+ queue.AddOrUpdate("file1", "error")
+
+ // Manually set retry count to max
+ queue.lock.Lock()
+ item, exists := queue.itemIndex["file1"]
+ if !exists {
+ queue.lock.Unlock()
+ t.Fatal("Item not found in queue")
+ }
+ item.RetryCount = MaxRetryAttempts
+ item.NextRetryAt = time.Now().Add(-1 * time.Second) // Ready now
+ heap.Fix(&queue.heap, item.heapIndex)
+ queue.lock.Unlock()
+
+ // Try to get ready items - should be returned for the last retry (attempt #10)
+ readyItems := queue.GetReadyItems(10)
+ if len(readyItems) != 1 {
+ t.Fatalf("Expected 1 item for last retry, got %d", len(readyItems))
+ }
+
+ // Requeue it, which will increment its retry count beyond the max
+ queue.RequeueForRetry(readyItems[0], "final error")
+
+ // Manually make it ready again
+ queue.lock.Lock()
+ item, exists = queue.itemIndex["file1"]
+ if !exists {
+ queue.lock.Unlock()
+ t.Fatal("Item not found in queue after requeue")
+ }
+ item.NextRetryAt = time.Now().Add(-1 * time.Second)
+ heap.Fix(&queue.heap, item.heapIndex)
+ queue.lock.Unlock()
+
+ // Now it should be discarded (retry count is 11, exceeds max of 10)
+ readyItems = queue.GetReadyItems(10)
+ if len(readyItems) != 0 {
+ t.Errorf("Expected 0 items (max attempts exceeded), got %d", len(readyItems))
+ }
+
+ // Should be removed from queue
+ if queue.Size() != 0 {
+ t.Errorf("Expected queue size 0 after max attempts exceeded, got %d", queue.Size())
+ }
+}
+
+func TestCalculateBackoff(t *testing.T) {
+ testCases := []struct {
+ retryCount int
+ expectedDelay time.Duration
+ description string
+ }{
+ {1, InitialRetryDelay, "first retry"},
+ {2, InitialRetryDelay * 2, "second retry"},
+ {3, InitialRetryDelay * 4, "third retry"},
+ {4, InitialRetryDelay * 8, "fourth retry"},
+ {5, InitialRetryDelay * 16, "fifth retry"},
+ {10, MaxRetryDelay, "capped at max delay"},
+ {65, MaxRetryDelay, "overflow protection (shift > 63)"},
+ {100, MaxRetryDelay, "very high retry count"},
+ }
+
+ for _, tc := range testCases {
+ result := calculateBackoff(tc.retryCount)
+ if result != tc.expectedDelay {
+ t.Errorf("%s (retry %d): expected %v, got %v",
+ tc.description, tc.retryCount, tc.expectedDelay, result)
+ }
+ }
+}
+
+func TestIsRetryableError(t *testing.T) {
+ testCases := []struct {
+ error string
+ retryable bool
+ description string
+ }{
+ {"volume 123 is read only", true, "read-only volume"},
+ {"connection reset by peer", true, "connection reset"},
+ {"timeout exceeded", true, "timeout"},
+ {"deadline exceeded", true, "deadline exceeded"},
+ {"context canceled", true, "context canceled"},
+ {"lookup error: volume not found", true, "lookup error"},
+ {"connection refused", true, "connection refused"},
+ {"too many requests", true, "rate limiting"},
+ {"service unavailable", true, "service unavailable"},
+ {"i/o timeout", true, "I/O timeout"},
+ {"broken pipe", true, "broken pipe"},
+ {"not found", false, "not found (not retryable)"},
+ {"invalid file id", false, "invalid input (not retryable)"},
+ {"", false, "empty error"},
+ }
+
+ for _, tc := range testCases {
+ result := isRetryableError(tc.error)
+ if result != tc.retryable {
+ t.Errorf("%s: expected retryable=%v, got %v for error: %q",
+ tc.description, tc.retryable, result, tc.error)
+ }
+ }
+}
+
+func TestDeletionRetryQueue_HeapOrdering(t *testing.T) {
+ queue := NewDeletionRetryQueue()
+
+ now := time.Now()
+
+ // Add items with different retry times (out of order)
+ items := []*DeletionRetryItem{
+ {FileId: "file3", RetryCount: 1, NextRetryAt: now.Add(30 * time.Second), LastError: "error3"},
+ {FileId: "file1", RetryCount: 1, NextRetryAt: now.Add(10 * time.Second), LastError: "error1"},
+ {FileId: "file2", RetryCount: 1, NextRetryAt: now.Add(20 * time.Second), LastError: "error2"},
+ }
+
+ // Add items directly (simulating internal state)
+ for _, item := range items {
+ queue.lock.Lock()
+ queue.itemIndex[item.FileId] = item
+ queue.heap = append(queue.heap, item)
+ queue.lock.Unlock()
+ }
+
+ // Use container/heap.Init to establish heap property
+ queue.lock.Lock()
+ heap.Init(&queue.heap)
+ queue.lock.Unlock()
+
+ // Verify heap maintains min-heap property (earliest time at top)
+ queue.lock.Lock()
+ if queue.heap[0].FileId != "file1" {
+ t.Errorf("Expected file1 at heap top (earliest time), got %s", queue.heap[0].FileId)
+ }
+ queue.lock.Unlock()
+
+ // Set all items to ready while preserving their relative order
+ queue.lock.Lock()
+ for _, item := range queue.itemIndex {
+ // Shift all times back by 40 seconds to make them ready, but preserve order
+ item.NextRetryAt = item.NextRetryAt.Add(-40 * time.Second)
+ }
+ heap.Init(&queue.heap) // Re-establish heap property after modification
+ queue.lock.Unlock()
+
+ // GetReadyItems should return in NextRetryAt order
+ readyItems := queue.GetReadyItems(10)
+ expectedOrder := []string{"file1", "file2", "file3"}
+
+ if len(readyItems) != 3 {
+ t.Fatalf("Expected 3 ready items, got %d", len(readyItems))
+ }
+
+ for i, item := range readyItems {
+ if item.FileId != expectedOrder[i] {
+ t.Errorf("Item %d: expected %s, got %s", i, expectedOrder[i], item.FileId)
+ }
+ }
+}
+
+func TestDeletionRetryQueue_DuplicateFileIds(t *testing.T) {
+ queue := NewDeletionRetryQueue()
+
+ // Add same file ID twice with retryable error - simulates duplicate in batch
+ queue.AddOrUpdate("file1", "timeout error")
+
+ // Verify only one item exists in queue
+ if queue.Size() != 1 {
+ t.Fatalf("Expected queue size 1 after first add, got %d", queue.Size())
+ }
+
+ // Get initial retry count
+ queue.lock.Lock()
+ item1, exists := queue.itemIndex["file1"]
+ if !exists {
+ queue.lock.Unlock()
+ t.Fatal("Item not found in queue after first add")
+ }
+ initialRetryCount := item1.RetryCount
+ queue.lock.Unlock()
+
+ // Add same file ID again - should NOT increment retry count (just update error)
+ queue.AddOrUpdate("file1", "timeout error again")
+
+ // Verify still only one item exists in queue (not duplicated)
+ if queue.Size() != 1 {
+ t.Errorf("Expected queue size 1 after duplicate add, got %d (duplicates detected)", queue.Size())
+ }
+
+ // Verify retry count did NOT increment (AddOrUpdate only updates error, not count)
+ queue.lock.Lock()
+ item2, exists := queue.itemIndex["file1"]
+ queue.lock.Unlock()
+
+ if !exists {
+ t.Fatal("Item not found in queue after second add")
+ }
+ if item2.RetryCount != initialRetryCount {
+ t.Errorf("Expected RetryCount to stay at %d after duplicate add (should not increment), got %d", initialRetryCount, item2.RetryCount)
+ }
+ if item2.LastError != "timeout error again" {
+ t.Errorf("Expected LastError to be updated to 'timeout error again', got %q", item2.LastError)
+ }
+}