diff options
Diffstat (limited to 'weed')
54 files changed, 3753 insertions, 454 deletions
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 61a7ced6d..9261591c4 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -201,8 +201,11 @@ table = "seaweedfs" [redis2] enabled = false address = "localhost:6379" +username = "" password = "" database = 0 +# prefix for filer redis keys +keyPrefix = "" enable_tls = false ca_cert_path = "" client_cert_path = "" @@ -217,6 +220,8 @@ masterName = "master" username = "" password = "" database = 0 +# prefix for filer redis keys +keyPrefix = "" enable_tls = false ca_cert_path = "" client_cert_path = "" @@ -232,7 +237,10 @@ addresses = [ "localhost:30005", "localhost:30006", ] +username = "" password = "" +# prefix for filer redis keys +keyPrefix = "" enable_tls = false ca_cert_path = "" client_cert_path = "" @@ -248,8 +256,11 @@ superLargeDirectories = [] [redis_lua] enabled = false address = "localhost:6379" +username = "" password = "" database = 0 +# prefix for filer redis keys +keyPrefix = "" enable_tls = false ca_cert_path = "" client_cert_path = "" @@ -264,6 +275,8 @@ masterName = "master" username = "" password = "" database = 0 +# prefix for filer redis keys +keyPrefix = "" enable_tls = false ca_cert_path = "" client_cert_path = "" @@ -279,7 +292,10 @@ addresses = [ "localhost:30005", "localhost:30006", ] +username = "" password = "" +# prefix for filer redis keys +keyPrefix = "" enable_tls = false ca_cert_path = "" client_cert_path = "" @@ -373,8 +389,10 @@ dialTimeOut = 10 enabled = false location = "/tmp/" address = "localhost:6379" +username = "" password = "" database = 1 +keyPrefix = "" [tikv] enabled = false diff --git a/weed/command/server.go b/weed/command/server.go index 52f47ec32..ebd9f359a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -134,6 +134,7 @@ func init() { serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") + serverOptions.v.id = cmdServer.Flag.String("volume.id", "", "volume server id. If empty, default to ip:port") serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") diff --git a/weed/command/volume.go b/weed/command/volume.go index e21437e9a..514553172 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -41,6 +41,7 @@ type VolumeServerOptions struct { folderMaxLimits []int32 idxFolder *string ip *string + id *string publicUrl *string bindIp *string mastersString *string @@ -78,6 +79,7 @@ func init() { v.portGrpc = cmdVolume.Flag.Int("port.grpc", 0, "grpc listen port") v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public") v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") + v.id = cmdVolume.Flag.String("id", "", "volume server id. If empty, default to ip:port") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") v.mastersString = cmdVolume.Flag.String("master", "localhost:9333", "comma-separated master servers") @@ -253,8 +255,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind = storage.NeedleMapLevelDbLarge } + // Determine volume server ID: if not specified, use ip:port + volumeServerId := util.GetVolumeServerId(*v.id, *v.ip, *v.port) + volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, - *v.ip, *v.port, *v.portGrpc, *v.publicUrl, + *v.ip, *v.port, *v.portGrpc, *v.publicUrl, volumeServerId, v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes, *v.idxFolder, volumeNeedleMapKind, diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue.go b/weed/filer/empty_folder_cleanup/cleanup_queue.go new file mode 100644 index 000000000..f92af389d --- /dev/null +++ b/weed/filer/empty_folder_cleanup/cleanup_queue.go @@ -0,0 +1,207 @@ +package empty_folder_cleanup + +import ( + "container/list" + "sync" + "time" +) + +// CleanupQueue manages a deduplicated queue of folders pending cleanup. +// It uses a doubly-linked list ordered by event time (oldest at front) and a map for O(1) deduplication. +// Processing is triggered when: +// - Queue size reaches maxSize, OR +// - Oldest item exceeds maxAge +type CleanupQueue struct { + mu sync.Mutex + items *list.List // Linked list of *queueItem ordered by time (front = oldest) + itemsMap map[string]*list.Element // folder -> list element for O(1) lookup + maxSize int // Max queue size before triggering cleanup + maxAge time.Duration // Max age before triggering cleanup +} + +// queueItem represents an item in the cleanup queue +type queueItem struct { + folder string + queueTime time.Time +} + +// NewCleanupQueue creates a new CleanupQueue with the specified limits +func NewCleanupQueue(maxSize int, maxAge time.Duration) *CleanupQueue { + return &CleanupQueue{ + items: list.New(), + itemsMap: make(map[string]*list.Element), + maxSize: maxSize, + maxAge: maxAge, + } +} + +// Add adds a folder to the queue with the specified event time. +// The item is inserted in time-sorted order (oldest at front) to handle out-of-order events. +// If folder already exists with an older time, the time is updated and position adjusted. +// Returns true if the folder was newly added, false if it was updated. +func (q *CleanupQueue) Add(folder string, eventTime time.Time) bool { + q.mu.Lock() + defer q.mu.Unlock() + + // Check if folder already exists + if elem, exists := q.itemsMap[folder]; exists { + existingItem := elem.Value.(*queueItem) + // Only update if new event is later + if eventTime.After(existingItem.queueTime) { + // Remove from current position + q.items.Remove(elem) + // Re-insert with new time in sorted position + newElem := q.insertSorted(folder, eventTime) + q.itemsMap[folder] = newElem + } + return false + } + + // Insert new folder in sorted position + elem := q.insertSorted(folder, eventTime) + q.itemsMap[folder] = elem + return true +} + +// insertSorted inserts an item in the correct position to maintain time ordering (oldest at front) +func (q *CleanupQueue) insertSorted(folder string, eventTime time.Time) *list.Element { + item := &queueItem{ + folder: folder, + queueTime: eventTime, + } + + // Find the correct position (insert before the first item with a later time) + for elem := q.items.Back(); elem != nil; elem = elem.Prev() { + existingItem := elem.Value.(*queueItem) + if !eventTime.Before(existingItem.queueTime) { + // Insert after this element + return q.items.InsertAfter(item, elem) + } + } + + // This item is the oldest, insert at front + return q.items.PushFront(item) +} + +// Remove removes a specific folder from the queue (e.g., when a file is created). +// Returns true if the folder was found and removed. +func (q *CleanupQueue) Remove(folder string) bool { + q.mu.Lock() + defer q.mu.Unlock() + + elem, exists := q.itemsMap[folder] + if !exists { + return false + } + + q.items.Remove(elem) + delete(q.itemsMap, folder) + return true +} + +// ShouldProcess returns true if the queue should be processed. +// This is true when: +// - Queue size >= maxSize, OR +// - Oldest item age > maxAge +func (q *CleanupQueue) ShouldProcess() bool { + q.mu.Lock() + defer q.mu.Unlock() + + return q.shouldProcessLocked() +} + +// shouldProcessLocked checks if processing is needed (caller must hold lock) +func (q *CleanupQueue) shouldProcessLocked() bool { + if q.items.Len() == 0 { + return false + } + + // Check if queue is full + if q.items.Len() >= q.maxSize { + return true + } + + // Check if oldest item exceeds max age + front := q.items.Front() + if front != nil { + item := front.Value.(*queueItem) + if time.Since(item.queueTime) > q.maxAge { + return true + } + } + + return false +} + +// Pop removes and returns the oldest folder from the queue. +// Returns the folder and true if an item was available, or empty string and false if queue is empty. +func (q *CleanupQueue) Pop() (string, bool) { + q.mu.Lock() + defer q.mu.Unlock() + + front := q.items.Front() + if front == nil { + return "", false + } + + item := front.Value.(*queueItem) + q.items.Remove(front) + delete(q.itemsMap, item.folder) + + return item.folder, true +} + +// Peek returns the oldest folder without removing it. +// Returns the folder and queue time if available, or empty values if queue is empty. +func (q *CleanupQueue) Peek() (folder string, queueTime time.Time, ok bool) { + q.mu.Lock() + defer q.mu.Unlock() + + front := q.items.Front() + if front == nil { + return "", time.Time{}, false + } + + item := front.Value.(*queueItem) + return item.folder, item.queueTime, true +} + +// Len returns the current queue size. +func (q *CleanupQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.items.Len() +} + +// Contains checks if a folder is in the queue. +func (q *CleanupQueue) Contains(folder string) bool { + q.mu.Lock() + defer q.mu.Unlock() + _, exists := q.itemsMap[folder] + return exists +} + +// Clear removes all items from the queue. +func (q *CleanupQueue) Clear() { + q.mu.Lock() + defer q.mu.Unlock() + + q.items.Init() + q.itemsMap = make(map[string]*list.Element) +} + +// OldestAge returns the age of the oldest item in the queue, or 0 if empty. +func (q *CleanupQueue) OldestAge() time.Duration { + q.mu.Lock() + defer q.mu.Unlock() + + front := q.items.Front() + if front == nil { + return 0 + } + + item := front.Value.(*queueItem) + return time.Since(item.queueTime) +} + + diff --git a/weed/filer/empty_folder_cleanup/cleanup_queue_test.go b/weed/filer/empty_folder_cleanup/cleanup_queue_test.go new file mode 100644 index 000000000..2effa3138 --- /dev/null +++ b/weed/filer/empty_folder_cleanup/cleanup_queue_test.go @@ -0,0 +1,371 @@ +package empty_folder_cleanup + +import ( + "testing" + "time" +) + +func TestCleanupQueue_Add(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + now := time.Now() + + // Add first item + if !q.Add("/buckets/b1/folder1", now) { + t.Error("expected Add to return true for new item") + } + if q.Len() != 1 { + t.Errorf("expected len 1, got %d", q.Len()) + } + + // Add second item with later time + if !q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) { + t.Error("expected Add to return true for new item") + } + if q.Len() != 2 { + t.Errorf("expected len 2, got %d", q.Len()) + } + + // Add duplicate with newer time - should update and reposition + if q.Add("/buckets/b1/folder1", now.Add(2*time.Second)) { + t.Error("expected Add to return false for existing item") + } + if q.Len() != 2 { + t.Errorf("expected len 2 after duplicate, got %d", q.Len()) + } + + // folder1 should now be at the back (newer time) - verify by popping + folder1, _ := q.Pop() + folder2, _ := q.Pop() + if folder1 != "/buckets/b1/folder2" || folder2 != "/buckets/b1/folder1" { + t.Errorf("expected folder1 to be moved to back, got %s, %s", folder1, folder2) + } +} + +func TestCleanupQueue_Add_OutOfOrder(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + baseTime := time.Now() + + // Add items out of order + q.Add("/buckets/b1/folder3", baseTime.Add(3*time.Second)) + q.Add("/buckets/b1/folder1", baseTime.Add(1*time.Second)) + q.Add("/buckets/b1/folder2", baseTime.Add(2*time.Second)) + + // Items should be in time order (oldest first) - verify by popping + expected := []string{"/buckets/b1/folder1", "/buckets/b1/folder2", "/buckets/b1/folder3"} + for i, exp := range expected { + folder, ok := q.Pop() + if !ok || folder != exp { + t.Errorf("at index %d: expected %s, got %s", i, exp, folder) + } + } +} + +func TestCleanupQueue_Add_DuplicateWithOlderTime(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + baseTime := time.Now() + + // Add folder at t=5 + q.Add("/buckets/b1/folder1", baseTime.Add(5*time.Second)) + + // Try to add same folder with older time - should NOT update + q.Add("/buckets/b1/folder1", baseTime.Add(2*time.Second)) + + // Time should remain at t=5 + _, queueTime, _ := q.Peek() + if queueTime != baseTime.Add(5*time.Second) { + t.Errorf("expected time to remain unchanged, got %v", queueTime) + } +} + +func TestCleanupQueue_Remove(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + now := time.Now() + + q.Add("/buckets/b1/folder1", now) + q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + + // Remove middle item + if !q.Remove("/buckets/b1/folder2") { + t.Error("expected Remove to return true for existing item") + } + if q.Len() != 2 { + t.Errorf("expected len 2, got %d", q.Len()) + } + if q.Contains("/buckets/b1/folder2") { + t.Error("removed item should not be in queue") + } + + // Remove non-existent item + if q.Remove("/buckets/b1/nonexistent") { + t.Error("expected Remove to return false for non-existent item") + } + + // Verify order is preserved by popping + folder1, _ := q.Pop() + folder3, _ := q.Pop() + if folder1 != "/buckets/b1/folder1" || folder3 != "/buckets/b1/folder3" { + t.Errorf("unexpected order: %s, %s", folder1, folder3) + } +} + +func TestCleanupQueue_Pop(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + now := time.Now() + + // Pop from empty queue + folder, ok := q.Pop() + if ok { + t.Error("expected Pop to return false for empty queue") + } + if folder != "" { + t.Errorf("expected empty folder, got %s", folder) + } + + // Add items and pop in order + q.Add("/buckets/b1/folder1", now) + q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + + folder, ok = q.Pop() + if !ok || folder != "/buckets/b1/folder1" { + t.Errorf("expected folder1, got %s (ok=%v)", folder, ok) + } + + folder, ok = q.Pop() + if !ok || folder != "/buckets/b1/folder2" { + t.Errorf("expected folder2, got %s (ok=%v)", folder, ok) + } + + folder, ok = q.Pop() + if !ok || folder != "/buckets/b1/folder3" { + t.Errorf("expected folder3, got %s (ok=%v)", folder, ok) + } + + // Queue should be empty now + if q.Len() != 0 { + t.Errorf("expected empty queue, got len %d", q.Len()) + } +} + +func TestCleanupQueue_Peek(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + now := time.Now() + + // Peek empty queue + folder, _, ok := q.Peek() + if ok { + t.Error("expected Peek to return false for empty queue") + } + + // Add item and peek + q.Add("/buckets/b1/folder1", now) + folder, queueTime, ok := q.Peek() + if !ok || folder != "/buckets/b1/folder1" { + t.Errorf("expected folder1, got %s (ok=%v)", folder, ok) + } + if queueTime != now { + t.Errorf("expected queue time %v, got %v", now, queueTime) + } + + // Peek should not remove item + if q.Len() != 1 { + t.Errorf("Peek should not remove item, len=%d", q.Len()) + } +} + +func TestCleanupQueue_Contains(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + now := time.Now() + + q.Add("/buckets/b1/folder1", now) + + if !q.Contains("/buckets/b1/folder1") { + t.Error("expected Contains to return true") + } + if q.Contains("/buckets/b1/folder2") { + t.Error("expected Contains to return false for non-existent") + } +} + +func TestCleanupQueue_ShouldProcess_MaxSize(t *testing.T) { + q := NewCleanupQueue(3, 10*time.Minute) + now := time.Now() + + // Empty queue + if q.ShouldProcess() { + t.Error("empty queue should not need processing") + } + + // Add items below max + q.Add("/buckets/b1/folder1", now) + q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) + if q.ShouldProcess() { + t.Error("queue below max should not need processing") + } + + // Add item to reach max + q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + if !q.ShouldProcess() { + t.Error("queue at max should need processing") + } +} + +func TestCleanupQueue_ShouldProcess_MaxAge(t *testing.T) { + q := NewCleanupQueue(100, 100*time.Millisecond) // Short max age for testing + + // Add item with old event time + oldTime := time.Now().Add(-1 * time.Second) // 1 second ago + q.Add("/buckets/b1/folder1", oldTime) + + // Item is older than maxAge, should need processing + if !q.ShouldProcess() { + t.Error("old item should trigger processing") + } + + // Clear and add fresh item + q.Clear() + q.Add("/buckets/b1/folder2", time.Now()) + + // Fresh item should not trigger processing + if q.ShouldProcess() { + t.Error("fresh item should not trigger processing") + } +} + +func TestCleanupQueue_Clear(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + now := time.Now() + + q.Add("/buckets/b1/folder1", now) + q.Add("/buckets/b1/folder2", now.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", now.Add(2*time.Second)) + + q.Clear() + + if q.Len() != 0 { + t.Errorf("expected empty queue after Clear, got len %d", q.Len()) + } + if q.Contains("/buckets/b1/folder1") { + t.Error("queue should not contain items after Clear") + } +} + +func TestCleanupQueue_OldestAge(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + + // Empty queue + if q.OldestAge() != 0 { + t.Error("empty queue should have zero oldest age") + } + + // Add item with time in the past + oldTime := time.Now().Add(-5 * time.Minute) + q.Add("/buckets/b1/folder1", oldTime) + + // Age should be approximately 5 minutes + age := q.OldestAge() + if age < 4*time.Minute || age > 6*time.Minute { + t.Errorf("expected ~5m age, got %v", age) + } +} + +func TestCleanupQueue_TimeOrder(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + baseTime := time.Now() + + // Add items in order + items := []string{ + "/buckets/b1/a", + "/buckets/b1/b", + "/buckets/b1/c", + "/buckets/b1/d", + "/buckets/b1/e", + } + for i, item := range items { + q.Add(item, baseTime.Add(time.Duration(i)*time.Second)) + } + + // Pop should return in time order + for i, expected := range items { + got, ok := q.Pop() + if !ok { + t.Errorf("Pop %d: expected item, got empty", i) + } + if got != expected { + t.Errorf("Pop %d: expected %s, got %s", i, expected, got) + } + } +} + +func TestCleanupQueue_DuplicateWithNewerTime(t *testing.T) { + q := NewCleanupQueue(100, 10*time.Minute) + baseTime := time.Now() + + // Add items + q.Add("/buckets/b1/folder1", baseTime) + q.Add("/buckets/b1/folder2", baseTime.Add(1*time.Second)) + q.Add("/buckets/b1/folder3", baseTime.Add(2*time.Second)) + + // Add duplicate with newer time - should update and reposition + q.Add("/buckets/b1/folder1", baseTime.Add(3*time.Second)) + + // folder1 should now be at the back (newest time) - verify by popping + expected := []string{"/buckets/b1/folder2", "/buckets/b1/folder3", "/buckets/b1/folder1"} + for i, exp := range expected { + folder, ok := q.Pop() + if !ok || folder != exp { + t.Errorf("at index %d: expected %s, got %s", i, exp, folder) + } + } +} + +func TestCleanupQueue_Concurrent(t *testing.T) { + q := NewCleanupQueue(1000, 10*time.Minute) + done := make(chan bool) + now := time.Now() + + // Concurrent adds + go func() { + for i := 0; i < 100; i++ { + q.Add("/buckets/b1/folder"+string(rune('A'+i%26)), now.Add(time.Duration(i)*time.Millisecond)) + } + done <- true + }() + + // Concurrent removes + go func() { + for i := 0; i < 50; i++ { + q.Remove("/buckets/b1/folder" + string(rune('A'+i%26))) + } + done <- true + }() + + // Concurrent pops + go func() { + for i := 0; i < 30; i++ { + q.Pop() + } + done <- true + }() + + // Concurrent reads + go func() { + for i := 0; i < 100; i++ { + q.Len() + q.Contains("/buckets/b1/folderA") + q.ShouldProcess() + } + done <- true + }() + + // Wait for all goroutines + for i := 0; i < 4; i++ { + <-done + } + + // Just verify no panic occurred and queue is in consistent state + _ = q.Len() +} + + diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go new file mode 100644 index 000000000..70856aaf1 --- /dev/null +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner.go @@ -0,0 +1,436 @@ +package empty_folder_cleanup + +import ( + "context" + "strings" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +const ( + DefaultMaxCountCheck = 1000 + DefaultCacheExpiry = 5 * time.Minute + DefaultQueueMaxSize = 1000 + DefaultQueueMaxAge = 10 * time.Minute + DefaultProcessorSleep = 10 * time.Second // How often to check queue +) + +// FilerOperations defines the filer operations needed by EmptyFolderCleaner +type FilerOperations interface { + CountDirectoryEntries(ctx context.Context, dirPath util.FullPath, limit int) (count int, err error) + DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32, ifNotModifiedAfter int64) error +} + +// folderState tracks the state of a folder for empty folder cleanup +type folderState struct { + roughCount int // Cached rough count (up to maxCountCheck) + lastAddTime time.Time // Last time an item was added + lastDelTime time.Time // Last time an item was deleted + lastCheck time.Time // Last time we checked the actual count +} + +// EmptyFolderCleaner handles asynchronous cleanup of empty folders +// Each filer owns specific folders via consistent hashing based on the peer filer list +type EmptyFolderCleaner struct { + filer FilerOperations + lockRing *lock_manager.LockRing + host pb.ServerAddress + + // Folder state tracking + mu sync.RWMutex + folderCounts map[string]*folderState // Rough count cache + + // Cleanup queue (thread-safe, has its own lock) + cleanupQueue *CleanupQueue + + // Configuration + maxCountCheck int // Max items to count (1000) + cacheExpiry time.Duration // How long to keep cache entries + processorSleep time.Duration // How often processor checks queue + bucketPath string // e.g., "/buckets" + + // Control + enabled bool + stopCh chan struct{} +} + +// NewEmptyFolderCleaner creates a new EmptyFolderCleaner +func NewEmptyFolderCleaner(filer FilerOperations, lockRing *lock_manager.LockRing, host pb.ServerAddress, bucketPath string) *EmptyFolderCleaner { + efc := &EmptyFolderCleaner{ + filer: filer, + lockRing: lockRing, + host: host, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(DefaultQueueMaxSize, DefaultQueueMaxAge), + maxCountCheck: DefaultMaxCountCheck, + cacheExpiry: DefaultCacheExpiry, + processorSleep: DefaultProcessorSleep, + bucketPath: bucketPath, + enabled: true, + stopCh: make(chan struct{}), + } + go efc.cacheEvictionLoop() + go efc.cleanupProcessor() + return efc +} + +// SetEnabled enables or disables the cleaner +func (efc *EmptyFolderCleaner) SetEnabled(enabled bool) { + efc.mu.Lock() + defer efc.mu.Unlock() + efc.enabled = enabled +} + +// IsEnabled returns whether the cleaner is enabled +func (efc *EmptyFolderCleaner) IsEnabled() bool { + efc.mu.RLock() + defer efc.mu.RUnlock() + return efc.enabled +} + +// ownsFolder checks if this filer owns the folder via consistent hashing +func (efc *EmptyFolderCleaner) ownsFolder(folder string) bool { + servers := efc.lockRing.GetSnapshot() + if len(servers) <= 1 { + return true // Single filer case + } + return efc.hashKeyToServer(folder, servers) == efc.host +} + +// hashKeyToServer uses consistent hashing to map a folder to a server +func (efc *EmptyFolderCleaner) hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { + if len(servers) == 0 { + return "" + } + x := util.HashStringToLong(key) + if x < 0 { + x = -x + } + x = x % int64(len(servers)) + return servers[x] +} + +// OnDeleteEvent is called when a file or directory is deleted +// Both file and directory deletions count towards making the parent folder empty +// eventTime is the time when the delete event occurred (for proper ordering) +func (efc *EmptyFolderCleaner) OnDeleteEvent(directory string, entryName string, isDirectory bool, eventTime time.Time) { + // Skip if not under bucket path (must be at least /buckets/<bucket>/...) + if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) { + return + } + + // Check if we own this folder + if !efc.ownsFolder(directory) { + glog.V(4).Infof("EmptyFolderCleaner: not owner of %s, skipping", directory) + return + } + + efc.mu.Lock() + defer efc.mu.Unlock() + + // Check enabled inside lock to avoid race with Stop() + if !efc.enabled { + return + } + + glog.V(3).Infof("EmptyFolderCleaner: delete event in %s/%s (isDir=%v)", directory, entryName, isDirectory) + + // Update cached count (create entry if needed) + state, exists := efc.folderCounts[directory] + if !exists { + state = &folderState{} + efc.folderCounts[directory] = state + } + if state.roughCount > 0 { + state.roughCount-- + } + state.lastDelTime = eventTime + + // Only add to cleanup queue if roughCount suggests folder might be empty + if state.roughCount > 0 { + glog.V(3).Infof("EmptyFolderCleaner: skipping queue for %s, roughCount=%d", directory, state.roughCount) + return + } + + // Add to cleanup queue with event time (handles out-of-order events) + if efc.cleanupQueue.Add(directory, eventTime) { + glog.V(3).Infof("EmptyFolderCleaner: queued %s for cleanup", directory) + } +} + +// OnCreateEvent is called when a file or directory is created +// Both file and directory creations cancel pending cleanup for the parent folder +func (efc *EmptyFolderCleaner) OnCreateEvent(directory string, entryName string, isDirectory bool) { + // Skip if not under bucket path (must be at least /buckets/<bucket>/...) + if efc.bucketPath != "" && !isUnderBucketPath(directory, efc.bucketPath) { + return + } + + efc.mu.Lock() + defer efc.mu.Unlock() + + // Check enabled inside lock to avoid race with Stop() + if !efc.enabled { + return + } + + // Update cached count only if already tracked (no need to track new folders) + if state, exists := efc.folderCounts[directory]; exists { + state.roughCount++ + state.lastAddTime = time.Now() + } + + // Remove from cleanup queue (cancel pending cleanup) + if efc.cleanupQueue.Remove(directory) { + glog.V(3).Infof("EmptyFolderCleaner: cancelled cleanup for %s due to new entry", directory) + } +} + +// cleanupProcessor runs in background and processes the cleanup queue +func (efc *EmptyFolderCleaner) cleanupProcessor() { + ticker := time.NewTicker(efc.processorSleep) + defer ticker.Stop() + + for { + select { + case <-efc.stopCh: + return + case <-ticker.C: + efc.processCleanupQueue() + } + } +} + +// processCleanupQueue processes items from the cleanup queue +func (efc *EmptyFolderCleaner) processCleanupQueue() { + // Check if we should process + if !efc.cleanupQueue.ShouldProcess() { + return + } + + glog.V(3).Infof("EmptyFolderCleaner: processing cleanup queue (len=%d, age=%v)", + efc.cleanupQueue.Len(), efc.cleanupQueue.OldestAge()) + + // Process all items that are ready + for efc.cleanupQueue.Len() > 0 { + // Check if still enabled + if !efc.IsEnabled() { + return + } + + // Pop the oldest item + folder, ok := efc.cleanupQueue.Pop() + if !ok { + break + } + + // Execute cleanup for this folder + efc.executeCleanup(folder) + + // If queue is no longer full and oldest item is not old enough, stop processing + if !efc.cleanupQueue.ShouldProcess() { + break + } + } +} + +// executeCleanup performs the actual cleanup of an empty folder +func (efc *EmptyFolderCleaner) executeCleanup(folder string) { + efc.mu.Lock() + + // Quick check: if we have cached count and it's > 0, skip + if state, exists := efc.folderCounts[folder]; exists { + if state.roughCount > 0 { + glog.V(3).Infof("EmptyFolderCleaner: skipping %s, cached count=%d", folder, state.roughCount) + efc.mu.Unlock() + return + } + // If there was an add after our delete, skip + if !state.lastAddTime.IsZero() && state.lastAddTime.After(state.lastDelTime) { + glog.V(3).Infof("EmptyFolderCleaner: skipping %s, add happened after delete", folder) + efc.mu.Unlock() + return + } + } + efc.mu.Unlock() + + // Re-check ownership (topology might have changed) + if !efc.ownsFolder(folder) { + glog.V(3).Infof("EmptyFolderCleaner: no longer owner of %s, skipping", folder) + return + } + + // Check if folder is actually empty (count up to maxCountCheck) + ctx := context.Background() + count, err := efc.countItems(ctx, folder) + if err != nil { + glog.V(2).Infof("EmptyFolderCleaner: error counting items in %s: %v", folder, err) + return + } + + efc.mu.Lock() + // Update cache + if _, exists := efc.folderCounts[folder]; !exists { + efc.folderCounts[folder] = &folderState{} + } + efc.folderCounts[folder].roughCount = count + efc.folderCounts[folder].lastCheck = time.Now() + efc.mu.Unlock() + + if count > 0 { + glog.V(3).Infof("EmptyFolderCleaner: folder %s has %d items, not empty", folder, count) + return + } + + // Delete the empty folder + glog.V(2).Infof("EmptyFolderCleaner: deleting empty folder %s", folder) + if err := efc.deleteFolder(ctx, folder); err != nil { + glog.V(2).Infof("EmptyFolderCleaner: failed to delete empty folder %s: %v", folder, err) + return + } + + // Clean up cache entry + efc.mu.Lock() + delete(efc.folderCounts, folder) + efc.mu.Unlock() + + // Note: No need to recursively check parent folder here. + // The deletion of this folder will generate a metadata event, + // which will trigger OnDeleteEvent for the parent folder. +} + +// countItems counts items in a folder (up to maxCountCheck) +func (efc *EmptyFolderCleaner) countItems(ctx context.Context, folder string) (int, error) { + return efc.filer.CountDirectoryEntries(ctx, util.FullPath(folder), efc.maxCountCheck) +} + +// deleteFolder deletes an empty folder +func (efc *EmptyFolderCleaner) deleteFolder(ctx context.Context, folder string) error { + return efc.filer.DeleteEntryMetaAndData(ctx, util.FullPath(folder), false, false, false, false, nil, 0) +} + +// isUnderPath checks if child is under parent path +func isUnderPath(child, parent string) bool { + if parent == "" || parent == "/" { + return true + } + // Ensure parent ends without slash for proper prefix matching + if len(parent) > 0 && parent[len(parent)-1] == '/' { + parent = parent[:len(parent)-1] + } + // Child must start with parent and then have a / or be exactly parent + if len(child) < len(parent) { + return false + } + if child[:len(parent)] != parent { + return false + } + if len(child) == len(parent) { + return true + } + return child[len(parent)] == '/' +} + +// isUnderBucketPath checks if directory is inside a bucket (under /buckets/<bucket>/...) +// This ensures we only clean up folders inside buckets, not the buckets themselves +func isUnderBucketPath(directory, bucketPath string) bool { + if bucketPath == "" { + return true + } + // Ensure bucketPath ends without slash + if len(bucketPath) > 0 && bucketPath[len(bucketPath)-1] == '/' { + bucketPath = bucketPath[:len(bucketPath)-1] + } + // Directory must be under bucketPath + if !isUnderPath(directory, bucketPath) { + return false + } + // Directory must be at least /buckets/<bucket>/<something> + // i.e., depth must be at least bucketPath depth + 2 + // For /buckets (depth 1), we need at least /buckets/mybucket/folder (depth 3) + bucketPathDepth := strings.Count(bucketPath, "/") + directoryDepth := strings.Count(directory, "/") + return directoryDepth >= bucketPathDepth+2 +} + +// cacheEvictionLoop periodically removes stale entries from folderCounts +func (efc *EmptyFolderCleaner) cacheEvictionLoop() { + ticker := time.NewTicker(efc.cacheExpiry) + defer ticker.Stop() + + for { + select { + case <-efc.stopCh: + return + case <-ticker.C: + efc.evictStaleCacheEntries() + } + } +} + +// evictStaleCacheEntries removes cache entries that haven't been accessed recently +func (efc *EmptyFolderCleaner) evictStaleCacheEntries() { + efc.mu.Lock() + defer efc.mu.Unlock() + + now := time.Now() + expiredCount := 0 + for folder, state := range efc.folderCounts { + // Skip if folder is in cleanup queue + if efc.cleanupQueue.Contains(folder) { + continue + } + + // Find the most recent activity time for this folder + lastActivity := state.lastCheck + if state.lastAddTime.After(lastActivity) { + lastActivity = state.lastAddTime + } + if state.lastDelTime.After(lastActivity) { + lastActivity = state.lastDelTime + } + + // Evict if no activity within cache expiry period + if now.Sub(lastActivity) > efc.cacheExpiry { + delete(efc.folderCounts, folder) + expiredCount++ + } + } + + if expiredCount > 0 { + glog.V(3).Infof("EmptyFolderCleaner: evicted %d stale cache entries", expiredCount) + } +} + +// Stop stops the cleaner and cancels all pending tasks +func (efc *EmptyFolderCleaner) Stop() { + close(efc.stopCh) + + efc.mu.Lock() + defer efc.mu.Unlock() + + efc.enabled = false + efc.cleanupQueue.Clear() + efc.folderCounts = make(map[string]*folderState) // Clear cache on stop +} + +// GetPendingCleanupCount returns the number of pending cleanup tasks (for testing) +func (efc *EmptyFolderCleaner) GetPendingCleanupCount() int { + return efc.cleanupQueue.Len() +} + +// GetCachedFolderCount returns the cached count for a folder (for testing) +func (efc *EmptyFolderCleaner) GetCachedFolderCount(folder string) (int, bool) { + efc.mu.RLock() + defer efc.mu.RUnlock() + if state, exists := efc.folderCounts[folder]; exists { + return state.roughCount, true + } + return 0, false +} + diff --git a/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go new file mode 100644 index 000000000..fbc05ccf8 --- /dev/null +++ b/weed/filer/empty_folder_cleanup/empty_folder_cleaner_test.go @@ -0,0 +1,569 @@ +package empty_folder_cleanup + +import ( + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" + "github.com/seaweedfs/seaweedfs/weed/pb" +) + +func Test_isUnderPath(t *testing.T) { + tests := []struct { + name string + child string + parent string + expected bool + }{ + {"child under parent", "/buckets/mybucket/folder/file.txt", "/buckets", true}, + {"child is parent", "/buckets", "/buckets", true}, + {"child not under parent", "/other/path", "/buckets", false}, + {"empty parent", "/any/path", "", true}, + {"root parent", "/any/path", "/", true}, + {"parent with trailing slash", "/buckets/mybucket", "/buckets/", true}, + {"similar prefix but not under", "/buckets-other/file", "/buckets", false}, + {"deeply nested", "/buckets/a/b/c/d/e/f", "/buckets/a/b", true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isUnderPath(tt.child, tt.parent) + if result != tt.expected { + t.Errorf("isUnderPath(%q, %q) = %v, want %v", tt.child, tt.parent, result, tt.expected) + } + }) + } +} + +func Test_isUnderBucketPath(t *testing.T) { + tests := []struct { + name string + directory string + bucketPath string + expected bool + }{ + // Should NOT process - bucket path itself + {"bucket path itself", "/buckets", "/buckets", false}, + // Should NOT process - bucket directory (immediate child) + {"bucket directory", "/buckets/mybucket", "/buckets", false}, + // Should process - folder inside bucket + {"folder in bucket", "/buckets/mybucket/folder", "/buckets", true}, + // Should process - nested folder + {"nested folder", "/buckets/mybucket/a/b/c", "/buckets", true}, + // Should NOT process - outside buckets + {"outside buckets", "/other/path", "/buckets", false}, + // Empty bucket path allows all + {"empty bucket path", "/any/path", "", true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isUnderBucketPath(tt.directory, tt.bucketPath) + if result != tt.expected { + t.Errorf("isUnderBucketPath(%q, %q) = %v, want %v", tt.directory, tt.bucketPath, result, tt.expected) + } + }) + } +} + +func TestEmptyFolderCleaner_ownsFolder(t *testing.T) { + // Create a LockRing with multiple servers + lockRing := lock_manager.NewLockRing(5 * time.Second) + + servers := []pb.ServerAddress{ + "filer1:8888", + "filer2:8888", + "filer3:8888", + } + lockRing.SetSnapshot(servers) + + // Create cleaner for filer1 + cleaner1 := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + } + + // Create cleaner for filer2 + cleaner2 := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer2:8888", + } + + // Create cleaner for filer3 + cleaner3 := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer3:8888", + } + + // Test that exactly one filer owns each folder + testFolders := []string{ + "/buckets/mybucket/folder1", + "/buckets/mybucket/folder2", + "/buckets/mybucket/folder3", + "/buckets/mybucket/a/b/c", + "/buckets/otherbucket/x", + } + + for _, folder := range testFolders { + ownCount := 0 + if cleaner1.ownsFolder(folder) { + ownCount++ + } + if cleaner2.ownsFolder(folder) { + ownCount++ + } + if cleaner3.ownsFolder(folder) { + ownCount++ + } + + if ownCount != 1 { + t.Errorf("folder %q owned by %d filers, expected exactly 1", folder, ownCount) + } + } +} + +func TestEmptyFolderCleaner_ownsFolder_singleServer(t *testing.T) { + // Create a LockRing with a single server + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + } + + // Single filer should own all folders + testFolders := []string{ + "/buckets/mybucket/folder1", + "/buckets/mybucket/folder2", + "/buckets/otherbucket/x", + } + + for _, folder := range testFolders { + if !cleaner.ownsFolder(folder) { + t.Errorf("single filer should own folder %q", folder) + } + } +} + +func TestEmptyFolderCleaner_ownsFolder_emptyRing(t *testing.T) { + // Create an empty LockRing + lockRing := lock_manager.NewLockRing(5 * time.Second) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + } + + // With empty ring, should own all folders + if !cleaner.ownsFolder("/buckets/mybucket/folder") { + t.Error("should own folder with empty ring") + } +} + +func TestEmptyFolderCleaner_OnCreateEvent_cancelsCleanup(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + folder := "/buckets/mybucket/testfolder" + now := time.Now() + + // Simulate delete event + cleaner.OnDeleteEvent(folder, "file.txt", false, now) + + // Check that cleanup is queued + if cleaner.GetPendingCleanupCount() != 1 { + t.Errorf("expected 1 pending cleanup, got %d", cleaner.GetPendingCleanupCount()) + } + + // Simulate create event + cleaner.OnCreateEvent(folder, "newfile.txt", false) + + // Check that cleanup is cancelled + if cleaner.GetPendingCleanupCount() != 0 { + t.Errorf("expected 0 pending cleanups after create, got %d", cleaner.GetPendingCleanupCount()) + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_OnDeleteEvent_deduplication(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + folder := "/buckets/mybucket/testfolder" + now := time.Now() + + // Simulate multiple delete events for same folder + for i := 0; i < 5; i++ { + cleaner.OnDeleteEvent(folder, "file"+string(rune('0'+i))+".txt", false, now.Add(time.Duration(i)*time.Second)) + } + + // Check that only 1 cleanup is queued (deduplicated) + if cleaner.GetPendingCleanupCount() != 1 { + t.Errorf("expected 1 pending cleanup after deduplication, got %d", cleaner.GetPendingCleanupCount()) + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_OnDeleteEvent_multipleFolders(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + now := time.Now() + + // Delete files in different folders + cleaner.OnDeleteEvent("/buckets/mybucket/folder1", "file.txt", false, now) + cleaner.OnDeleteEvent("/buckets/mybucket/folder2", "file.txt", false, now.Add(1*time.Second)) + cleaner.OnDeleteEvent("/buckets/mybucket/folder3", "file.txt", false, now.Add(2*time.Second)) + + // Each folder should be queued + if cleaner.GetPendingCleanupCount() != 3 { + t.Errorf("expected 3 pending cleanups, got %d", cleaner.GetPendingCleanupCount()) + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_OnDeleteEvent_notOwner(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888", "filer2:8888"}) + + // Create cleaner for filer that doesn't own the folder + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + now := time.Now() + + // Try many folders, looking for one that filer1 doesn't own + foundNonOwned := false + for i := 0; i < 100; i++ { + folder := "/buckets/mybucket/folder" + string(rune('0'+i%10)) + string(rune('0'+i/10)) + if !cleaner.ownsFolder(folder) { + // This folder is not owned by filer1 + cleaner.OnDeleteEvent(folder, "file.txt", false, now) + if cleaner.GetPendingCleanupCount() != 0 { + t.Errorf("non-owner should not queue cleanup for folder %s", folder) + } + foundNonOwned = true + break + } + } + + if !foundNonOwned { + t.Skip("could not find a folder not owned by filer1") + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_OnDeleteEvent_disabled(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: false, // Disabled + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + folder := "/buckets/mybucket/testfolder" + now := time.Now() + + // Simulate delete event + cleaner.OnDeleteEvent(folder, "file.txt", false, now) + + // Check that no cleanup is queued when disabled + if cleaner.GetPendingCleanupCount() != 0 { + t.Errorf("disabled cleaner should not queue cleanup, got %d", cleaner.GetPendingCleanupCount()) + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_OnDeleteEvent_directoryDeletion(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + folder := "/buckets/mybucket/testfolder" + now := time.Now() + + // Simulate directory delete event - should trigger cleanup + // because subdirectory deletion also makes parent potentially empty + cleaner.OnDeleteEvent(folder, "subdir", true, now) + + // Check that cleanup IS queued for directory deletion + if cleaner.GetPendingCleanupCount() != 1 { + t.Errorf("directory deletion should trigger cleanup, got %d", cleaner.GetPendingCleanupCount()) + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_cachedCounts(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + folder := "/buckets/mybucket/testfolder" + + // Initialize cached count + cleaner.folderCounts[folder] = &folderState{roughCount: 5} + + // Simulate create events + cleaner.OnCreateEvent(folder, "newfile1.txt", false) + cleaner.OnCreateEvent(folder, "newfile2.txt", false) + + // Check cached count increased + count, exists := cleaner.GetCachedFolderCount(folder) + if !exists { + t.Error("cached folder count should exist") + } + if count != 7 { + t.Errorf("expected cached count 7, got %d", count) + } + + // Simulate delete events + now := time.Now() + cleaner.OnDeleteEvent(folder, "file1.txt", false, now) + cleaner.OnDeleteEvent(folder, "file2.txt", false, now.Add(1*time.Second)) + + // Check cached count decreased + count, exists = cleaner.GetCachedFolderCount(folder) + if !exists { + t.Error("cached folder count should exist") + } + if count != 5 { + t.Errorf("expected cached count 5, got %d", count) + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_Stop(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + now := time.Now() + + // Queue some cleanups + cleaner.OnDeleteEvent("/buckets/mybucket/folder1", "file1.txt", false, now) + cleaner.OnDeleteEvent("/buckets/mybucket/folder2", "file2.txt", false, now.Add(1*time.Second)) + cleaner.OnDeleteEvent("/buckets/mybucket/folder3", "file3.txt", false, now.Add(2*time.Second)) + + // Verify cleanups are queued + if cleaner.GetPendingCleanupCount() < 1 { + t.Error("expected at least 1 pending cleanup before stop") + } + + // Stop the cleaner + cleaner.Stop() + + // Verify all cleanups are cancelled + if cleaner.GetPendingCleanupCount() != 0 { + t.Errorf("expected 0 pending cleanups after stop, got %d", cleaner.GetPendingCleanupCount()) + } +} + +func TestEmptyFolderCleaner_cacheEviction(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + cacheExpiry: 100 * time.Millisecond, // Short expiry for testing + stopCh: make(chan struct{}), + } + + folder1 := "/buckets/mybucket/folder1" + folder2 := "/buckets/mybucket/folder2" + folder3 := "/buckets/mybucket/folder3" + + // Add some cache entries with old timestamps + oldTime := time.Now().Add(-1 * time.Hour) + cleaner.folderCounts[folder1] = &folderState{roughCount: 5, lastCheck: oldTime} + cleaner.folderCounts[folder2] = &folderState{roughCount: 3, lastCheck: oldTime} + // folder3 has recent activity + cleaner.folderCounts[folder3] = &folderState{roughCount: 2, lastCheck: time.Now()} + + // Verify all entries exist + if len(cleaner.folderCounts) != 3 { + t.Errorf("expected 3 cache entries, got %d", len(cleaner.folderCounts)) + } + + // Run eviction + cleaner.evictStaleCacheEntries() + + // Verify stale entries are evicted + if len(cleaner.folderCounts) != 1 { + t.Errorf("expected 1 cache entry after eviction, got %d", len(cleaner.folderCounts)) + } + + // Verify the recent entry still exists + if _, exists := cleaner.folderCounts[folder3]; !exists { + t.Error("expected folder3 to still exist in cache") + } + + // Verify stale entries are removed + if _, exists := cleaner.folderCounts[folder1]; exists { + t.Error("expected folder1 to be evicted") + } + if _, exists := cleaner.folderCounts[folder2]; exists { + t.Error("expected folder2 to be evicted") + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_cacheEviction_skipsEntriesInQueue(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + cacheExpiry: 100 * time.Millisecond, + stopCh: make(chan struct{}), + } + + folder := "/buckets/mybucket/folder" + oldTime := time.Now().Add(-1 * time.Hour) + + // Add a stale cache entry + cleaner.folderCounts[folder] = &folderState{roughCount: 0, lastCheck: oldTime} + // Also add to cleanup queue + cleaner.cleanupQueue.Add(folder, time.Now()) + + // Run eviction + cleaner.evictStaleCacheEntries() + + // Verify entry is NOT evicted because it's in cleanup queue + if _, exists := cleaner.folderCounts[folder]; !exists { + t.Error("expected folder to still exist in cache (is in cleanup queue)") + } + + cleaner.Stop() +} + +func TestEmptyFolderCleaner_queueFIFOOrder(t *testing.T) { + lockRing := lock_manager.NewLockRing(5 * time.Second) + lockRing.SetSnapshot([]pb.ServerAddress{"filer1:8888"}) + + cleaner := &EmptyFolderCleaner{ + lockRing: lockRing, + host: "filer1:8888", + bucketPath: "/buckets", + enabled: true, + folderCounts: make(map[string]*folderState), + cleanupQueue: NewCleanupQueue(1000, 10*time.Minute), + stopCh: make(chan struct{}), + } + + now := time.Now() + + // Add folders in order + folders := []string{ + "/buckets/mybucket/folder1", + "/buckets/mybucket/folder2", + "/buckets/mybucket/folder3", + } + for i, folder := range folders { + cleaner.OnDeleteEvent(folder, "file.txt", false, now.Add(time.Duration(i)*time.Second)) + } + + // Verify queue length + if cleaner.GetPendingCleanupCount() != 3 { + t.Errorf("expected 3 queued folders, got %d", cleaner.GetPendingCleanupCount()) + } + + // Verify time-sorted order by popping + for i, expected := range folders { + folder, ok := cleaner.cleanupQueue.Pop() + if !ok || folder != expected { + t.Errorf("expected folder %s at index %d, got %s", expected, i, folder) + } + } + + cleaner.Stop() +} + diff --git a/weed/filer/filer.go b/weed/filer/filer.go index f9f3d4fb2..382eb644f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -11,6 +11,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" + "github.com/seaweedfs/seaweedfs/weed/filer/empty_folder_cleanup" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -56,6 +57,7 @@ type Filer struct { MaxFilenameLength uint32 deletionQuit chan struct{} DeletionRetryQueue *DeletionRetryQueue + EmptyFolderCleaner *empty_folder_cleanup.EmptyFolderCleaner } func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, maxFilenameLength uint32, notifyFn func()) *Filer { @@ -116,6 +118,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste f.Dlm.LockRing.SetSnapshot(snapshot) glog.V(0).Infof("%s aggregate from peers %+v", self, snapshot) + // Initialize the empty folder cleaner using the same LockRing as Dlm for consistent hashing + f.EmptyFolderCleaner = empty_folder_cleanup.NewEmptyFolderCleaner(f, f.Dlm.LockRing, self, f.DirBucketsPath) + f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) f.MasterClient.SetOnPeerUpdateFn(func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { if update.NodeType != cluster.FilerType { @@ -506,6 +511,9 @@ func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bo func (f *Filer) Shutdown() { close(f.deletionQuit) + if f.EmptyFolderCleaner != nil { + f.EmptyFolderCleaner.Stop() + } f.LocalMetaLogBuffer.ShutdownLogBuffer() f.Store.Shutdown() } diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 845a0678e..45c9b070f 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -66,6 +66,10 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry f.logMetaEvent(ctx, fullpath, eventNotification) + // Trigger empty folder cleanup for local events + // Remote events are handled via MetaAggregator.onMetadataChangeEvent + f.triggerLocalEmptyFolderCleanup(oldEntry, newEntry) + } func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotification *filer_pb.EventNotification) { @@ -89,6 +93,41 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica } +// triggerLocalEmptyFolderCleanup triggers empty folder cleanup for local events +// This is needed because onMetadataChangeEvent is only called for remote peer events +func (f *Filer) triggerLocalEmptyFolderCleanup(oldEntry, newEntry *Entry) { + if f.EmptyFolderCleaner == nil || !f.EmptyFolderCleaner.IsEnabled() { + return + } + + eventTime := time.Now() + + // Handle delete events (oldEntry exists, newEntry is nil) + if oldEntry != nil && newEntry == nil { + dir, name := oldEntry.FullPath.DirAndName() + f.EmptyFolderCleaner.OnDeleteEvent(dir, name, oldEntry.IsDirectory(), eventTime) + } + + // Handle create events (oldEntry is nil, newEntry exists) + if oldEntry == nil && newEntry != nil { + dir, name := newEntry.FullPath.DirAndName() + f.EmptyFolderCleaner.OnCreateEvent(dir, name, newEntry.IsDirectory()) + } + + // Handle rename/move events (both exist but paths differ) + if oldEntry != nil && newEntry != nil { + oldDir, oldName := oldEntry.FullPath.DirAndName() + newDir, newName := newEntry.FullPath.DirAndName() + + if oldDir != newDir || oldName != newName { + // Treat old location as delete + f.EmptyFolderCleaner.OnDeleteEvent(oldDir, oldName, oldEntry.IsDirectory(), eventTime) + // Treat new location as create + f.EmptyFolderCleaner.OnCreateEvent(newDir, newName, newEntry.IsDirectory()) + } + } +} + func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { if len(buf) == 0 { diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go index acbf4aa47..4ee80b3a6 100644 --- a/weed/filer/filer_on_meta_event.go +++ b/weed/filer/filer_on_meta_event.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -13,6 +14,7 @@ func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) f.maybeReloadFilerConfiguration(event) f.maybeReloadRemoteStorageConfigurationAndMapping(event) f.onBucketEvents(event) + f.onEmptyFolderCleanupEvents(event) } func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { @@ -32,6 +34,43 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) { } } +// onEmptyFolderCleanupEvents handles create/delete events for empty folder cleanup +func (f *Filer) onEmptyFolderCleanupEvents(event *filer_pb.SubscribeMetadataResponse) { + if f.EmptyFolderCleaner == nil || !f.EmptyFolderCleaner.IsEnabled() { + return + } + + message := event.EventNotification + directory := event.Directory + eventTime := time.Unix(0, event.TsNs) + + // Handle delete events - trigger folder cleanup check + if filer_pb.IsDelete(event) && message.OldEntry != nil { + f.EmptyFolderCleaner.OnDeleteEvent(directory, message.OldEntry.Name, message.OldEntry.IsDirectory, eventTime) + } + + // Handle create events - cancel pending cleanup for the folder + if filer_pb.IsCreate(event) && message.NewEntry != nil { + f.EmptyFolderCleaner.OnCreateEvent(directory, message.NewEntry.Name, message.NewEntry.IsDirectory) + } + + // Handle rename/move events + if filer_pb.IsRename(event) { + // Treat the old location as a delete + if message.OldEntry != nil { + f.EmptyFolderCleaner.OnDeleteEvent(directory, message.OldEntry.Name, message.OldEntry.IsDirectory, eventTime) + } + // Treat the new location as a create + if message.NewEntry != nil { + newDir := message.NewParentPath + if newDir == "" { + newDir = directory + } + f.EmptyFolderCleaner.OnCreateEvent(newDir, message.NewEntry.Name, message.NewEntry.IsDirectory) + } + } +} + func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataResponse) { if DirectoryEtcSeaweedFS != event.Directory { if DirectoryEtcSeaweedFS != event.EventNotification.NewParentPath { diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go index 294fc0e7f..e6366e82f 100644 --- a/weed/filer/filer_search.go +++ b/weed/filer/filer_search.go @@ -41,6 +41,19 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start return entries, hasMore, err } +// CountDirectoryEntries counts entries in a directory up to limit +func (f *Filer) CountDirectoryEntries(ctx context.Context, p util.FullPath, limit int) (count int, err error) { + entries, hasMore, err := f.ListDirectoryEntries(ctx, p, "", false, int64(limit), "", "", "") + if err != nil { + return 0, err + } + count = len(entries) + if hasMore { + count = limit // At least this many + } + return count, nil +} + // For now, prefix and namePattern are mutually exclusive func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) { if strings.HasSuffix(string(p), "/") && len(p) > 1 { diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go index 6e4f11d22..5e1593e9e 100644 --- a/weed/filer/redis2/redis_cluster_store.go +++ b/weed/filer/redis2/redis_cluster_store.go @@ -25,20 +25,24 @@ func (store *RedisCluster2Store) Initialize(configuration util.Configuration, pr return store.initialize( configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"keyPrefix"), configuration.GetBool(prefix+"useReadOnly"), configuration.GetBool(prefix+"routeByLatency"), configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *RedisCluster2Store) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { +func (store *RedisCluster2Store) initialize(addresses []string, username string, password string, keyPrefix string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: addresses, + Username: username, Password: password, ReadOnly: readOnly, RouteByLatency: routeByLatency, }) + store.keyPrefix = keyPrefix store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/redis_sentinel_store.go b/weed/filer/redis2/redis_sentinel_store.go index 5fc368fc7..dc15285bd 100644 --- a/weed/filer/redis2/redis_sentinel_store.go +++ b/weed/filer/redis2/redis_sentinel_store.go @@ -26,10 +26,11 @@ func (store *Redis2SentinelStore) Initialize(configuration util.Configuration, p configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetString(prefix+"keyPrefix"), ) } -func (store *Redis2SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { +func (store *Redis2SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int, keyPrefix string) (err error) { store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: masterName, SentinelAddrs: addresses, @@ -41,5 +42,6 @@ func (store *Redis2SentinelStore) initialize(addresses []string, masterName stri ReadTimeout: time.Second * 30, WriteTimeout: time.Second * 5, }) + store.keyPrefix = keyPrefix return } diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go index f9322be42..7193699f9 100644 --- a/weed/filer/redis2/redis_store.go +++ b/weed/filer/redis2/redis_store.go @@ -27,8 +27,10 @@ func (store *Redis2Store) GetName() string { func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetString(prefix+"keyPrefix"), configuration.GetStringSlice(prefix+"superLargeDirectories"), configuration.GetBool(prefix+"enable_mtls"), configuration.GetString(prefix+"ca_cert_path"), @@ -37,7 +39,13 @@ func (store *Redis2Store) Initialize(configuration util.Configuration, prefix st ) } -func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string, enableMtls bool, caCertPath string, clientCertPath string, clientKeyPath string) (err error) { +func (store *Redis2Store) initialize(hostPort string, username string, password string, database int, keyPrefix string, superLargeDirectories []string, enableMtls bool, caCertPath string, clientCertPath string, clientKeyPath string) (err error) { + opt := &redis.Options{ + Addr: hostPort, + Username: username, + Password: password, + DB: database, + } if enableMtls { clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath) if err != nil { @@ -59,25 +67,15 @@ func (store *Redis2Store) initialize(hostPort string, password string, database glog.Fatalf("Error parsing redis host and port from %s: %v", hostPort, err) } - tlsConfig := &tls.Config{ + opt.TLSConfig = &tls.Config{ Certificates: []tls.Certificate{clientCert}, RootCAs: caCertPool, ServerName: redisHost, MinVersion: tls.VersionTLS12, } - store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Password: password, - DB: database, - TLSConfig: tlsConfig, - }) - } else { - store.Client = redis.NewClient(&redis.Options{ - Addr: hostPort, - Password: password, - DB: database, - }) } + store.Client = redis.NewClient(opt) + store.keyPrefix = keyPrefix store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 0dbf7a72a..a8429a764 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -19,6 +19,7 @@ const ( type UniversalRedis2Store struct { Client redis.UniversalClient + keyPrefix string superLargeDirectoryHash map[string]bool } @@ -35,6 +36,13 @@ func (store *UniversalRedis2Store) loadSuperLargeDirectories(superLargeDirectori } } +func (store *UniversalRedis2Store) getKey(key string) string { + if store.keyPrefix == "" { + return key + } + return store.keyPrefix + key +} + func (store *UniversalRedis2Store) BeginTransaction(ctx context.Context) (context.Context, error) { return ctx, nil } @@ -57,7 +65,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer } if name != "" { - if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil { + if err = store.Client.ZAddNX(ctx, store.getKey(genDirectoryListKey(dir)), redis.Z{Score: 0, Member: name}).Err(); err != nil { return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err) } } @@ -75,7 +83,7 @@ func (store *UniversalRedis2Store) doInsertEntry(ctx context.Context, entry *fil value = util.MaybeGzipData(value) } - if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { + if err = store.Client.Set(ctx, store.getKey(string(entry.FullPath)), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } return nil @@ -88,7 +96,7 @@ func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { - data, err := store.Client.Get(ctx, string(fullpath)).Result() + data, err := store.Client.Get(ctx, store.getKey(string(fullpath))).Result() if err == redis.Nil { return nil, filer_pb.ErrNotFound } @@ -110,12 +118,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util. func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { - _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result() + _, err = store.Client.Del(ctx, store.getKey(genDirectoryListKey(string(fullpath)))).Result() if err != nil { return fmt.Errorf("delete dir list %s : %v", fullpath, err) } - _, err = store.Client.Del(ctx, string(fullpath)).Result() + _, err = store.Client.Del(ctx, store.getKey(string(fullpath))).Result() if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) } @@ -125,7 +133,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti return nil } if name != "" { - _, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result() + _, err = store.Client.ZRem(ctx, store.getKey(genDirectoryListKey(dir)), name).Result() if err != nil { return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) } @@ -140,7 +148,7 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful return nil } - members, err := store.Client.ZRangeByLex(ctx, genDirectoryListKey(string(fullpath)), &redis.ZRangeBy{ + members, err := store.Client.ZRangeByLex(ctx, store.getKey(genDirectoryListKey(string(fullpath))), &redis.ZRangeBy{ Min: "-", Max: "+", }).Result() @@ -150,12 +158,12 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful for _, fileName := range members { path := util.NewFullPath(string(fullpath), fileName) - _, err = store.Client.Del(ctx, string(path)).Result() + _, err = store.Client.Del(ctx, store.getKey(string(path))).Result() if err != nil { return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) } // not efficient, but need to remove if it is a directory - store.Client.Del(ctx, genDirectoryListKey(string(path))) + store.Client.Del(ctx, store.getKey(genDirectoryListKey(string(path)))) } return nil @@ -167,7 +175,7 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - dirListKey := genDirectoryListKey(string(dirPath)) + dirListKey := store.getKey(genDirectoryListKey(string(dirPath))) min := "-" if startFileName != "" { @@ -201,7 +209,7 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir } else { if entry.TtlSec > 0 { if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - store.Client.Del(ctx, string(path)).Result() + store.Client.Del(ctx, store.getKey(string(path))).Result() store.Client.ZRem(ctx, dirListKey, fileName).Result() continue } diff --git a/weed/filer/redis_lua/redis_cluster_store.go b/weed/filer/redis_lua/redis_cluster_store.go index 251aadbcd..b64342fc2 100644 --- a/weed/filer/redis_lua/redis_cluster_store.go +++ b/weed/filer/redis_lua/redis_cluster_store.go @@ -25,20 +25,24 @@ func (store *RedisLuaClusterStore) Initialize(configuration util.Configuration, return store.initialize( configuration.GetStringSlice(prefix+"addresses"), + configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), + configuration.GetString(prefix+"keyPrefix"), configuration.GetBool(prefix+"useReadOnly"), configuration.GetBool(prefix+"routeByLatency"), configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *RedisLuaClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { +func (store *RedisLuaClusterStore) initialize(addresses []string, username string, password string, keyPrefix string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) { store.Client = redis.NewClusterClient(&redis.ClusterOptions{ Addrs: addresses, + Username: username, Password: password, ReadOnly: readOnly, RouteByLatency: routeByLatency, }) + store.keyPrefix = keyPrefix store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis_lua/redis_sentinel_store.go b/weed/filer/redis_lua/redis_sentinel_store.go index f22a7fa66..12a582ac3 100644 --- a/weed/filer/redis_lua/redis_sentinel_store.go +++ b/weed/filer/redis_lua/redis_sentinel_store.go @@ -26,10 +26,11 @@ func (store *RedisLuaSentinelStore) Initialize(configuration util.Configuration, configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetString(prefix+"keyPrefix"), ) } -func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) { +func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int, keyPrefix string) (err error) { store.Client = redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: masterName, SentinelAddrs: addresses, @@ -41,5 +42,6 @@ func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName st ReadTimeout: time.Second * 30, WriteTimeout: time.Second * 5, }) + store.keyPrefix = keyPrefix return } diff --git a/weed/filer/redis_lua/redis_store.go b/weed/filer/redis_lua/redis_store.go index 8574baa09..4f6354e96 100644 --- a/weed/filer/redis_lua/redis_store.go +++ b/weed/filer/redis_lua/redis_store.go @@ -21,18 +21,22 @@ func (store *RedisLuaStore) GetName() string { func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) { return store.initialize( configuration.GetString(prefix+"address"), + configuration.GetString(prefix+"username"), configuration.GetString(prefix+"password"), configuration.GetInt(prefix+"database"), + configuration.GetString(prefix+"keyPrefix"), configuration.GetStringSlice(prefix+"superLargeDirectories"), ) } -func (store *RedisLuaStore) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) { +func (store *RedisLuaStore) initialize(hostPort string, username string, password string, database int, keyPrefix string, superLargeDirectories []string) (err error) { store.Client = redis.NewClient(&redis.Options{ Addr: hostPort, + Username: username, Password: password, DB: database, }) + store.keyPrefix = keyPrefix store.loadSuperLargeDirectories(superLargeDirectories) return } diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go index 35f6d4991..0a02a0730 100644 --- a/weed/filer/redis_lua/universal_redis_store.go +++ b/weed/filer/redis_lua/universal_redis_store.go @@ -20,6 +20,7 @@ const ( type UniversalRedisLuaStore struct { Client redis.UniversalClient + keyPrefix string superLargeDirectoryHash map[string]bool } @@ -36,6 +37,13 @@ func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirecto } } +func (store *UniversalRedisLuaStore) getKey(key string) string { + if store.keyPrefix == "" { + return key + } + return store.keyPrefix + key +} + func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) { return ctx, nil } @@ -60,7 +68,7 @@ func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *fil dir, name := entry.FullPath.DirAndName() err = stored_procedure.InsertEntryScript.Run(ctx, store.Client, - []string{string(entry.FullPath), genDirectoryListKey(dir)}, + []string{store.getKey(string(entry.FullPath)), store.getKey(genDirectoryListKey(dir))}, value, entry.TtlSec, store.isSuperLargeDirectory(dir), 0, name).Err() @@ -78,7 +86,7 @@ func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *fil func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { - data, err := store.Client.Get(ctx, string(fullpath)).Result() + data, err := store.Client.Get(ctx, store.getKey(string(fullpath))).Result() if err == redis.Nil { return nil, filer_pb.ErrNotFound } @@ -103,7 +111,7 @@ func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath u dir, name := fullpath.DirAndName() err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client, - []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)}, + []string{store.getKey(string(fullpath)), store.getKey(genDirectoryListKey(string(fullpath))), store.getKey(genDirectoryListKey(dir))}, store.isSuperLargeDirectory(dir), name).Err() if err != nil { @@ -120,7 +128,7 @@ func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, f } err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client, - []string{string(fullpath)}).Err() + []string{store.getKey(string(fullpath))}).Err() if err != nil { return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) @@ -135,7 +143,7 @@ func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Co func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { - dirListKey := genDirectoryListKey(string(dirPath)) + dirListKey := store.getKey(genDirectoryListKey(string(dirPath))) min := "-" if startFileName != "" { diff --git a/weed/pb/master.proto b/weed/pb/master.proto index f8049c466..afbf31de9 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -81,6 +81,7 @@ message Heartbeat { map<string, uint32> max_volume_counts = 4; uint32 grpc_port = 20; repeated string location_uuids = 21; + string id = 22; // volume server id, independent of ip:port for stable identification } message HeartbeatResponse { @@ -289,6 +290,7 @@ message DataNodeInfo { string id = 1; map<string, DiskInfo> diskInfos = 2; uint32 grpc_port = 3; + string address = 4; // ip:port for connecting to the volume server } message RackInfo { string id = 1; diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 19df43d71..41d46fad1 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -44,6 +44,7 @@ type Heartbeat struct { MaxVolumeCounts map[string]uint32 `protobuf:"bytes,4,rep,name=max_volume_counts,json=maxVolumeCounts,proto3" json:"max_volume_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` GrpcPort uint32 `protobuf:"varint,20,opt,name=grpc_port,json=grpcPort,proto3" json:"grpc_port,omitempty"` LocationUuids []string `protobuf:"bytes,21,rep,name=location_uuids,json=locationUuids,proto3" json:"location_uuids,omitempty"` + Id string `protobuf:"bytes,22,opt,name=id,proto3" json:"id,omitempty"` // volume server id, independent of ip:port for stable identification unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -204,6 +205,13 @@ func (x *Heartbeat) GetLocationUuids() []string { return nil } +func (x *Heartbeat) GetId() string { + if x != nil { + return x.Id + } + return "" +} + type HeartbeatResponse struct { state protoimpl.MessageState `protogen:"open.v1"` VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volume_size_limit,json=volumeSizeLimit,proto3" json:"volume_size_limit,omitempty"` @@ -2039,6 +2047,7 @@ type DataNodeInfo struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` DiskInfos map[string]*DiskInfo `protobuf:"bytes,2,rep,name=diskInfos,proto3" json:"diskInfos,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` GrpcPort uint32 `protobuf:"varint,3,opt,name=grpc_port,json=grpcPort,proto3" json:"grpc_port,omitempty"` + Address string `protobuf:"bytes,4,opt,name=address,proto3" json:"address,omitempty"` // ip:port for connecting to the volume server unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -2094,6 +2103,13 @@ func (x *DataNodeInfo) GetGrpcPort() uint32 { return 0 } +func (x *DataNodeInfo) GetAddress() string { + if x != nil { + return x.Address + } + return "" +} + type RackInfo struct { state protoimpl.MessageState `protogen:"open.v1"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` @@ -4038,7 +4054,7 @@ var File_master_proto protoreflect.FileDescriptor const file_master_proto_rawDesc = "" + "\n" + - "\fmaster.proto\x12\tmaster_pb\"\xc0\a\n" + + "\fmaster.proto\x12\tmaster_pb\"\xd0\a\n" + "\tHeartbeat\x12\x0e\n" + "\x02ip\x18\x01 \x01(\tR\x02ip\x12\x12\n" + "\x04port\x18\x02 \x01(\rR\x04port\x12\x1d\n" + @@ -4063,7 +4079,8 @@ const file_master_proto_rawDesc = "" + "\x10has_no_ec_shards\x18\x13 \x01(\bR\rhasNoEcShards\x12U\n" + "\x11max_volume_counts\x18\x04 \x03(\v2).master_pb.Heartbeat.MaxVolumeCountsEntryR\x0fmaxVolumeCounts\x12\x1b\n" + "\tgrpc_port\x18\x14 \x01(\rR\bgrpcPort\x12%\n" + - "\x0elocation_uuids\x18\x15 \x03(\tR\rlocationUuids\x1aB\n" + + "\x0elocation_uuids\x18\x15 \x03(\tR\rlocationUuids\x12\x0e\n" + + "\x02id\x18\x16 \x01(\tR\x02id\x1aB\n" + "\x14MaxVolumeCountsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + "\x05value\x18\x02 \x01(\rR\x05value:\x028\x01\"\xcd\x02\n" + @@ -4254,11 +4271,12 @@ const file_master_proto_rawDesc = "" + "\fvolume_infos\x18\x06 \x03(\v2#.master_pb.VolumeInformationMessageR\vvolumeInfos\x12P\n" + "\x0eec_shard_infos\x18\a \x03(\v2*.master_pb.VolumeEcShardInformationMessageR\fecShardInfos\x12.\n" + "\x13remote_volume_count\x18\b \x01(\x03R\x11remoteVolumeCount\x12\x17\n" + - "\adisk_id\x18\t \x01(\rR\x06diskId\"\xd4\x01\n" + + "\adisk_id\x18\t \x01(\rR\x06diskId\"\xee\x01\n" + "\fDataNodeInfo\x12\x0e\n" + "\x02id\x18\x01 \x01(\tR\x02id\x12D\n" + "\tdiskInfos\x18\x02 \x03(\v2&.master_pb.DataNodeInfo.DiskInfosEntryR\tdiskInfos\x12\x1b\n" + - "\tgrpc_port\x18\x03 \x01(\rR\bgrpcPort\x1aQ\n" + + "\tgrpc_port\x18\x03 \x01(\rR\bgrpcPort\x12\x18\n" + + "\aaddress\x18\x04 \x01(\tR\aaddress\x1aQ\n" + "\x0eDiskInfosEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12)\n" + "\x05value\x18\x02 \x01(\v2\x13.master_pb.DiskInfoR\x05value:\x028\x01\"\xf0\x01\n" + diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index a0aa79ae4..943b85519 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -2,11 +2,12 @@ package pb import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/util" "net" "strconv" "strings" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/util" ) type ServerAddress string @@ -32,7 +33,12 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress { } func NewServerAddressFromDataNode(dn *master_pb.DataNodeInfo) ServerAddress { - return NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)) + // Use Address field if available (new behavior), fall back to Id for backward compatibility + addr := dn.Address + if addr == "" { + addr = dn.Id // backward compatibility: old nodes use ip:port as id + } + return NewServerAddressWithGrpcPort(addr, int(dn.GrpcPort)) } func NewServerAddressFromLocation(dn *master_pb.Location) ServerAddress { diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go index c21b57009..f841c3e1e 100644 --- a/weed/s3api/chunked_reader_v4.go +++ b/weed/s3api/chunked_reader_v4.go @@ -116,6 +116,7 @@ func (iam *IdentityAccessManagement) newChunkedReader(req *http.Request) (io.Rea } checkSumWriter := getCheckSumWriter(checksumAlgorithm) + hasTrailer := amzTrailerHeader != "" return &s3ChunkedReader{ cred: credential, @@ -129,6 +130,7 @@ func (iam *IdentityAccessManagement) newChunkedReader(req *http.Request) (io.Rea checkSumWriter: checkSumWriter, state: readChunkHeader, iam: iam, + hasTrailer: hasTrailer, }, s3err.ErrNone } @@ -170,6 +172,7 @@ type s3ChunkedReader struct { n uint64 // Unread bytes in chunk err error iam *IdentityAccessManagement + hasTrailer bool } // Read chunk reads the chunk token signature portion. @@ -281,10 +284,10 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { } // If we're using unsigned streaming upload, there is no signature to verify at each chunk. - if cr.chunkSignature != "" { - cr.state = verifyChunk - } else if cr.lastChunk { + if cr.lastChunk && cr.hasTrailer { cr.state = readTrailerChunk + } else if cr.chunkSignature != "" { + cr.state = verifyChunk } else { cr.state = readChunkHeader } @@ -304,7 +307,11 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { // This implementation currently only supports the first case. // TODO: Implement the second case (signed upload with additional checksum computation for each chunk) - extractedCheckSumAlgorithm, extractedChecksum := parseChunkChecksum(cr.reader) + extractedCheckSumAlgorithm, extractedChecksum, err := parseChunkChecksum(cr.reader) + if err != nil { + cr.err = err + return 0, err + } if extractedCheckSumAlgorithm.String() != cr.checkSumAlgorithm { errorMessage := fmt.Sprintf("checksum algorithm in trailer '%s' does not match the one advertised in the header '%s'", extractedCheckSumAlgorithm.String(), cr.checkSumAlgorithm) @@ -313,6 +320,7 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { return 0, cr.err } + // Validate checksum for data integrity (required for both signed and unsigned streaming with trailers) computedChecksum := cr.checkSumWriter.Sum(nil) base64Checksum := base64.StdEncoding.EncodeToString(computedChecksum) if string(extractedChecksum) != base64Checksum { @@ -324,11 +332,6 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { // TODO: Extract signature from trailer chunk and verify it. // For now, we just read the trailer chunk and discard it. - // Reading remaining CRLF. - for i := 0; i < 2; i++ { - cr.err = readCRLF(cr.reader) - } - cr.state = eofChunk case readChunk: @@ -506,41 +509,37 @@ func parseS3ChunkExtension(buf []byte) ([]byte, []byte) { return buf[:semi], parseChunkSignature(buf[semi:]) } -func parseChunkChecksum(b *bufio.Reader) (ChecksumAlgorithm, []byte) { - // When using unsigned upload, this would be the raw contents of the trailer chunk: - // - // x-amz-checksum-crc32:YABb/g==\n\r\n\r\n // Trailer chunk (note optional \n character) - // \r\n // CRLF - // - // When using signed upload with an additional checksum algorithm, this would be the raw contents of the trailer chunk: - // - // x-amz-checksum-crc32:YABb/g==\n\r\n // Trailer chunk (note optional \n character) - // trailer-signature\r\n - // \r\n // CRLF - // - - // x-amz-checksum-crc32:YABb/g==\n - bytesRead, err := readChunkLine(b) - if err != nil { - return ChecksumAlgorithmNone, nil - } +func parseChunkChecksum(b *bufio.Reader) (ChecksumAlgorithm, []byte, error) { + // Read trailer lines until empty line + var checksumAlgorithm ChecksumAlgorithm + var checksum []byte - // Split on ':' - parts := bytes.SplitN(bytesRead, []byte(":"), 2) - checksumKey := string(parts[0]) - checksumValue := parts[1] + for { + bytesRead, err := readChunkLine(b) + if err != nil { + return ChecksumAlgorithmNone, nil, err + } - // Discard all trailing whitespace characters - checksumValue = trimTrailingWhitespace(checksumValue) + if len(bytesRead) == 0 { + break + } - // If the checksum key is not a supported checksum algorithm, return an error. - // TODO: Bubble that error up to the caller - extractedAlgorithm, err := extractChecksumAlgorithm(checksumKey) - if err != nil { - return ChecksumAlgorithmNone, nil + parts := bytes.SplitN(bytesRead, []byte(":"), 2) + if len(parts) == 2 { + key := string(bytes.TrimSpace(parts[0])) + value := bytes.TrimSpace(parts[1]) + if alg, err := extractChecksumAlgorithm(key); err == nil { + if checksumAlgorithm != ChecksumAlgorithmNone { + glog.V(3).Infof("multiple checksum headers found in trailer, using last: %s", key) + } + checksumAlgorithm = alg + checksum = value + } + // Ignore other trailer headers like x-amz-trailer-signature + } } - return extractedAlgorithm, checksumValue + return checksumAlgorithm, checksum, nil } func parseChunkSignature(chunk []byte) []byte { diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index 00449d80a..a10374339 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -519,7 +519,9 @@ func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { if errCode == s3err.ErrNoSuchBucket { - return "", nil + // Signal to callers that the bucket does not exist so they can + // decide whether to auto-create it (e.g., in PUT handlers). + return "", filer_pb.ErrNotFound } glog.Errorf("getVersioningState: failed to get bucket config for %s: %v", bucket, errCode) return "", fmt.Errorf("failed to get bucket config: %v", errCode) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 4e465919c..66d4ded80 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -199,7 +199,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } // Process metadata and tags and apply to destination - processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) + // Use dstEntry.Extended (already filtered) as the source, not entry.Extended, + // to preserve the encryption header filtering. Fixes GitHub #7562. + processedMetadata, tagErr := processMetadataBytes(r.Header, dstEntry.Extended, replaceMeta, replaceTagging) if tagErr != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return @@ -1543,7 +1545,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo } // copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios -// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain +// This unified function supports: SSE-C↔SSE-KMS↔SSE-S3, and any→Plain func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { var dstChunks []*filer_pb.FileChunk @@ -1552,6 +1554,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h var destKMSKeyID string var destKMSEncryptionContext map[string]string var destKMSBucketKeyEnabled bool + var destSSES3Key *SSES3Key if state.DstSSEC { var err error @@ -1565,7 +1568,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h if err != nil { return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err) } - } else { + } else if state.DstSSES3 { + // Generate SSE-S3 key for destination + var err error + destSSES3Key, err = GenerateSSES3Key() + if err != nil { + return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err) + } } // Parse source encryption parameters @@ -1584,12 +1593,18 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h var err error if chunk.GetSseType() == filer_pb.SSEType_SSE_C { - copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state) + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) } else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { - copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state) + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) + } else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) } else { - // Unencrypted chunk, copy directly - copiedChunk, err = s3a.copySingleChunk(chunk, dstPath) + // Unencrypted chunk - may need encryption if destination requires it + if state.DstSSEC || state.DstSSEKMS || state.DstSSES3 { + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) + } else { + copiedChunk, err = s3a.copySingleChunk(chunk, dstPath) + } } if err != nil { @@ -1640,6 +1655,40 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h } else { glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) } + } else if state.DstSSES3 && destSSES3Key != nil { + // For SSE-S3 destination, create object-level metadata + var sses3Metadata *SSES3Key + if len(dstChunks) == 0 { + // Handle 0-byte files - generate IV for metadata even though there's no content to encrypt + if entry.Attributes.FileSize != 0 { + return nil, nil, fmt.Errorf("internal error: no chunks created for non-empty SSE-S3 destination object") + } + // Generate IV for 0-byte object metadata + iv := make([]byte, s3_constants.AESBlockSize) + if _, err := io.ReadFull(rand.Reader, iv); err != nil { + return nil, nil, fmt.Errorf("generate IV for 0-byte object: %w", err) + } + destSSES3Key.IV = iv + sses3Metadata = destSSES3Key + } else { + // For non-empty objects, use the first chunk's metadata + if dstChunks[0].GetSseType() != filer_pb.SSEType_SSE_S3 || len(dstChunks[0].GetSseMetadata()) == 0 { + return nil, nil, fmt.Errorf("internal error: first chunk is missing expected SSE-S3 metadata for destination object") + } + keyManager := GetSSES3KeyManager() + var err error + sses3Metadata, err = DeserializeSSES3Metadata(dstChunks[0].GetSseMetadata(), keyManager) + if err != nil { + return nil, nil, fmt.Errorf("failed to deserialize SSE-S3 metadata from first chunk: %w", err) + } + } + // Use the derived key with its IV for object-level metadata + keyData, serErr := SerializeSSES3Metadata(sses3Metadata) + if serErr != nil { + return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr) + } + dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData + dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256") } // For unencrypted destination, no metadata needed (dstMetadata remains empty) @@ -1647,7 +1696,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h } // copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support -func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) { +func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, destSSES3Key *SSES3Key, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) { // Create destination chunk dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) @@ -1747,6 +1796,30 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour previewLen = len(finalData) } + } else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { + // Decrypt SSE-S3 source + if len(chunk.GetSseMetadata()) == 0 { + return nil, fmt.Errorf("SSE-S3 chunk missing per-chunk metadata") + } + + keyManager := GetSSES3KeyManager() + sourceSSEKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + decryptedReader, decErr := CreateSSES3DecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey, sourceSSEKey.IV) + if decErr != nil { + return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", decErr) + } + + decryptedData, readErr := io.ReadAll(decryptedReader) + if readErr != nil { + return nil, fmt.Errorf("decrypt SSE-S3 chunk data: %w", readErr) + } + finalData = decryptedData + glog.V(4).Infof("Decrypted SSE-S3 chunk, size: %d", len(finalData)) + } else { // Source is unencrypted finalData = encryptedData @@ -1808,6 +1881,36 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour dstChunk.SseMetadata = kmsMetadata glog.V(4).Infof("Re-encrypted chunk with SSE-KMS") + + } else if state.DstSSES3 && destSSES3Key != nil { + // Encrypt with SSE-S3 + encryptedReader, iv, encErr := CreateSSES3EncryptedReader(bytes.NewReader(finalData), destSSES3Key) + if encErr != nil { + return nil, fmt.Errorf("create SSE-S3 encrypted reader: %w", encErr) + } + + reencryptedData, readErr := io.ReadAll(encryptedReader) + if readErr != nil { + return nil, fmt.Errorf("re-encrypt with SSE-S3: %w", readErr) + } + finalData = reencryptedData + + // Create per-chunk SSE-S3 metadata with chunk-specific IV + chunkSSEKey := &SSES3Key{ + Key: destSSES3Key.Key, + KeyID: destSSES3Key.KeyID, + Algorithm: destSSES3Key.Algorithm, + IV: iv, + } + sses3Metadata, err := SerializeSSES3Metadata(chunkSSEKey) + if err != nil { + return nil, fmt.Errorf("serialize SSE-S3 metadata: %w", err) + } + + dstChunk.SseType = filer_pb.SSEType_SSE_S3 + dstChunk.SseMetadata = sses3Metadata + + glog.V(4).Infof("Re-encrypted chunk with SSE-S3") } // For unencrypted destination, finalData remains as decrypted plaintext diff --git a/weed/s3api/s3api_object_handlers_copy_unified.go b/weed/s3api/s3api_object_handlers_copy_unified.go index 255c3eb2d..f1b4ff280 100644 --- a/weed/s3api/s3api_object_handlers_copy_unified.go +++ b/weed/s3api/s3api_object_handlers_copy_unified.go @@ -1,7 +1,6 @@ package s3api import ( - "context" "errors" "fmt" "net/http" @@ -133,9 +132,9 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques } if state.DstSSES3 { - // Use streaming copy for SSE-S3 encryption - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err + // Use chunk-by-chunk copy for SSE-S3 encryption (consistent with SSE-C and SSE-KMS) + glog.V(2).Infof("Plain→SSE-S3 copy: using unified multipart encrypt copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) } return nil, nil, fmt.Errorf("unknown target encryption type") @@ -143,30 +142,18 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques // executeDecryptCopy handles encrypted → plain copies func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - // Use unified multipart-aware decrypt copy for all encryption types - if state.SrcSSEC || state.SrcSSEKMS { + // Use unified multipart-aware decrypt copy for all encryption types (consistent chunk-by-chunk) + if state.SrcSSEC || state.SrcSSEKMS || state.SrcSSES3 { glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy") return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath) } - if state.SrcSSES3 { - // Use streaming copy for SSE-S3 decryption - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err - } - return nil, nil, fmt.Errorf("unknown source encryption type") } // executeReencryptCopy handles encrypted → encrypted copies with different keys/methods func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - // Check if we should use streaming copy for better performance - if s3a.shouldUseStreamingCopy(entry, state) { - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err - } - - // Fallback to chunk-by-chunk approach for compatibility + // Use chunk-by-chunk approach for all cross-encryption scenarios (consistent behavior) if state.SrcSSEC && state.DstSSEC { return s3a.copyChunksWithSSEC(entry, r) } @@ -177,83 +164,8 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ return chunks, dstMetadata, err } - if state.SrcSSEC && state.DstSSEKMS { - // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy - glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy") - return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) - } - - if state.SrcSSEKMS && state.DstSSEC { - // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy - glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy") - return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) - } - - // Handle SSE-S3 cross-encryption scenarios - if state.SrcSSES3 || state.DstSSES3 { - // Any scenario involving SSE-S3 uses streaming copy - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err - } - - return nil, nil, fmt.Errorf("unsupported cross-encryption scenario") -} - -// shouldUseStreamingCopy determines if streaming copy should be used -func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool { - // Use streaming copy for large files or when beneficial - fileSize := entry.Attributes.FileSize - - // Use streaming for files larger than 10MB - if fileSize > 10*1024*1024 { - return true - } - - // Check if this is a multipart encrypted object - isMultipartEncrypted := false - if state.IsSourceEncrypted() { - encryptedChunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() != filer_pb.SSEType_NONE { - encryptedChunks++ - } - } - isMultipartEncrypted = encryptedChunks > 1 - } - - // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach - if isMultipartEncrypted { - glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach") - return false - } - - // Use streaming for cross-encryption scenarios (for single-part objects only) - if state.IsSourceEncrypted() && state.IsTargetEncrypted() { - srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3) - dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3) - if srcType != dstType { - return true - } - } - - // Use streaming for compressed files - if isCompressedEntry(entry) { - return true - } - - // Use streaming for SSE-S3 scenarios (always) - if state.SrcSSES3 || state.DstSSES3 { - return true - } - - return false -} - -// executeStreamingReencryptCopy performs streaming re-encryption copy -func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) { - // Create streaming copy manager - streamingManager := NewStreamingCopyManager(s3a) - - // Execute streaming copy - return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state) + // All other cross-encryption scenarios use unified multipart copy + // This includes: SSE-C↔SSE-KMS, SSE-C↔SSE-S3, SSE-KMS↔SSE-S3, SSE-S3↔SSE-S3 + glog.V(2).Infof("Cross-encryption copy: using unified multipart copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) } diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index f779a6edc..6e373bb4e 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -1,12 +1,10 @@ package s3api import ( - "context" "encoding/xml" "fmt" "io" "net/http" - "slices" "strings" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -127,22 +125,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Use operation context that won't be cancelled if request terminates - // This ensures deletion completes atomically to avoid inconsistent state - opCtx := context.WithoutCancel(r.Context()) - - if err := doDeleteEntry(client, dir, name, true, false); err != nil { - return err - } - - // Cleanup empty directories - if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 { - bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) - // Recursively delete empty parent directories, stop at bucket path - filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil) - } - - return nil + return doDeleteEntry(client, dir, name, true, false) + // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner + // which listens to metadata events and uses consistent hashing for coordination }) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -222,8 +207,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h var deleteErrors []DeleteError var auditLog *s3err.AccessLog - directoriesWithDeletion := make(map[string]bool) - if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } @@ -245,10 +228,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h versioningConfigured := (versioningState != "") s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Use operation context that won't be cancelled if request terminates - // This ensures batch deletion completes atomically to avoid inconsistent state - opCtx := context.WithoutCancel(r.Context()) - // delete file entries for _, object := range deleteObjects.Objects { if object.Key == "" { @@ -357,10 +336,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { - // Track directory for empty directory cleanup - if !s3a.option.AllowEmptyFolder { - directoriesWithDeletion[parentDirectoryPath] = true - } deletedObjects = append(deletedObjects, object) } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { deletedObjects = append(deletedObjects, object) @@ -380,30 +355,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } - // Cleanup empty directories - optimize by processing deepest first - if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 { - bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) - - // Collect and sort directories by depth (deepest first) to avoid redundant checks - var allDirs []string - for dirPath := range directoriesWithDeletion { - allDirs = append(allDirs, dirPath) - } - // Sort by depth (deeper directories first) - slices.SortFunc(allDirs, func(a, b string) int { - return strings.Count(b, "/") - strings.Count(a, "/") - }) - - // Track already-checked directories to avoid redundant work - checked := make(map[string]bool) - for _, dirPath := range allDirs { - if !checked[dirPath] { - // Recursively delete empty parent directories, stop at bucket path - // Mark this directory and all its parents as checked during recursion - filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked) - } - } - } + // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner + // which listens to metadata events and uses consistent hashing for coordination return nil }) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 100796b2e..f848790de 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "net/url" "path/filepath" "strconv" "strings" @@ -548,6 +549,28 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } } + // Parse and store object tags from X-Amz-Tagging header + // Fix for GitHub issue #7589: Tags sent during object upload were not being stored + if tagging := r.Header.Get(s3_constants.AmzObjectTagging); tagging != "" { + parsedTags, err := url.ParseQuery(tagging) + if err != nil { + glog.Warningf("putToFiler: Invalid S3 tag format in header '%s': %v", tagging, err) + return "", s3err.ErrInvalidTag, SSEResponseMetadata{} + } + for key, values := range parsedTags { + if len(values) > 1 { + glog.Warningf("putToFiler: Duplicate tag key '%s' in header", key) + return "", s3err.ErrInvalidTag, SSEResponseMetadata{} + } + value := "" + if len(values) > 0 { + value = values[0] + } + entry.Extended[s3_constants.AmzObjectTagging+"-"+key] = []byte(value) + } + glog.V(3).Infof("putToFiler: stored %d tags from X-Amz-Tagging header", len(parsedTags)) + } + // Set SSE-C metadata if customerKey != nil && len(sseIV) > 0 { // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) @@ -680,9 +703,9 @@ func filerErrorToS3Error(err error) s3err.ErrorCode { if err == nil { return s3err.ErrNone } - + errString := err.Error() - + switch { case errString == constants.ErrMsgBadDigest: return s3err.ErrBadDigest diff --git a/weed/s3api/s3api_streaming_copy.go b/weed/s3api/s3api_streaming_copy.go index 457986858..94729c003 100644 --- a/weed/s3api/s3api_streaming_copy.go +++ b/weed/s3api/s3api_streaming_copy.go @@ -59,18 +59,19 @@ func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager { } } -// ExecuteStreamingCopy performs a streaming copy operation -func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) { +// ExecuteStreamingCopy performs a streaming copy operation and returns the encryption spec +// The encryption spec is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562) +func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, *EncryptionSpec, error) { // Create streaming copy specification spec, err := scm.createStreamingSpec(entry, r, state) if err != nil { - return nil, fmt.Errorf("create streaming spec: %w", err) + return nil, nil, fmt.Errorf("create streaming spec: %w", err) } // Create source reader from entry sourceReader, err := scm.createSourceReader(entry) if err != nil { - return nil, fmt.Errorf("create source reader: %w", err) + return nil, nil, fmt.Errorf("create source reader: %w", err) } defer sourceReader.Close() @@ -79,11 +80,16 @@ func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry // Create processing pipeline processedReader, err := scm.createProcessingPipeline(spec) if err != nil { - return nil, fmt.Errorf("create processing pipeline: %w", err) + return nil, nil, fmt.Errorf("create processing pipeline: %w", err) } // Stream to destination - return scm.streamToDestination(ctx, processedReader, spec, dstPath) + chunks, err := scm.streamToDestination(ctx, processedReader, spec, dstPath) + if err != nil { + return nil, nil, err + } + + return chunks, spec.EncryptionSpec, nil } // createStreamingSpec creates a streaming specification based on copy parameters @@ -453,8 +459,8 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R for { n, err := reader.Read(buffer) if n > 0 { - // Create chunk for this data - chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath) + // Create chunk for this data, setting SSE type and per-chunk metadata (including chunk-specific IVs for SSE-S3) + chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath, spec.EncryptionSpec) if chunkErr != nil { return nil, fmt.Errorf("create chunk from data: %w", chunkErr) } @@ -474,7 +480,7 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R } // createChunkFromData creates a chunk from streaming data -func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) { +func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string, encSpec *EncryptionSpec) (*filer_pb.FileChunk, error) { // Assign new volume assignResult, err := scm.s3a.assignNewVolume(dstPath) if err != nil { @@ -487,6 +493,42 @@ func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, Size: uint64(len(data)), } + // Set SSE type and metadata on chunk if destination is encrypted + // This is critical for GetObject to know to decrypt the data - fixes GitHub #7562 + if encSpec != nil && encSpec.NeedsEncryption { + switch encSpec.DestinationType { + case EncryptionTypeSSEC: + chunk.SseType = filer_pb.SSEType_SSE_C + // SSE-C metadata is handled at object level, not per-chunk for streaming copy + case EncryptionTypeSSEKMS: + chunk.SseType = filer_pb.SSEType_SSE_KMS + // SSE-KMS metadata is handled at object level, not per-chunk for streaming copy + case EncryptionTypeSSES3: + chunk.SseType = filer_pb.SSEType_SSE_S3 + // Create per-chunk SSE-S3 metadata with chunk-specific IV + if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok { + // Calculate chunk-specific IV using base IV and chunk offset + baseIV := encSpec.DestinationIV + if len(baseIV) == 0 { + return nil, fmt.Errorf("SSE-S3 encryption requires DestinationIV to be set for chunk at offset %d", offset) + } + chunkIV, _ := calculateIVWithOffset(baseIV, offset) + // Create chunk key with the chunk-specific IV + chunkSSEKey := &SSES3Key{ + Key: sseKey.Key, + KeyID: sseKey.KeyID, + Algorithm: sseKey.Algorithm, + IV: chunkIV, + } + chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey) + if serErr != nil { + return nil, fmt.Errorf("failed to serialize chunk SSE-S3 metadata: %w", serErr) + } + chunk.SseMetadata = chunkMetadata + } + } + } + // Set file ID if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil { return nil, err diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index dcf279e1d..e053d9ea7 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -137,8 +137,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) - glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.Id, heartbeat.MaxVolumeCounts) + glog.V(0).Infof("added volume server %d: %v (id=%s, ip=%v:%d) %v", dn.Counter, dn.Id(), heartbeat.Id, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids) uuidlist, err := ms.RegisterUuids(heartbeat) if err != nil { if stream_err := stream.Send(&master_pb.HeartbeatResponse{ diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index a7ef8e7e9..d00cb5df4 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -253,7 +253,7 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku var locations []*master_pb.Location for _, dn := range shardLocations { locations = append(locations, &master_pb.Location{ - Url: string(dn.Id()), + Url: dn.Url(), PublicUrl: dn.PublicUrl, DataCenter: dn.GetDataCenterId(), }) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 4f8a7fb0d..65909996a 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -55,7 +55,7 @@ type VolumeServer struct { } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, - port int, grpcPort int, publicUrl string, + port int, grpcPort int, publicUrl string, id string, folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, idxFolder string, needleMapKind storage.NeedleMapKind, @@ -114,7 +114,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.checkWithMaster() - vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout) + vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, id, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index ec6490662..a54369277 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -4,28 +4,33 @@ import ( "net/http" "path/filepath" - "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/stats" ) +// healthzHandler checks the local health of the volume server. +// It only checks local conditions to avoid cascading failures when remote +// volume servers go down. Previously, this handler checked if all replicated +// volumes could reach their remote replicas, which caused healthy volume +// servers to fail health checks when a peer went down. +// See https://github.com/seaweedfs/seaweedfs/issues/6823 func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+version.VERSION) - volumeInfos := vs.store.VolumeInfos() - for _, vinfo := range volumeInfos { - if len(vinfo.Collection) == 0 { - continue - } - if vinfo.ReplicaPlacement.GetCopyCount() > 1 { - _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster) - if err != nil { - w.WriteHeader(http.StatusServiceUnavailable) - return - } - } + + // Check if the server is shutting down + if vs.store.IsStopping() { + w.WriteHeader(http.StatusServiceUnavailable) + return } + + // Check if we can communicate with master + if !vs.isHeartbeating { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) } diff --git a/weed/sftpd/sftp_file_writer.go b/weed/sftpd/sftp_file_writer.go index 0a662d021..fed60eec0 100644 --- a/weed/sftpd/sftp_file_writer.go +++ b/weed/sftpd/sftp_file_writer.go @@ -72,6 +72,7 @@ func (l listerat) ListAt(ls []os.FileInfo, offset int64) (int, error) { type SeaweedSftpFileWriter struct { fs SftpServer req *sftp.Request + absPath string // Absolute path after HomeDir translation mu sync.Mutex tmpFile *os.File permissions os.FileMode @@ -105,6 +106,6 @@ func (w *SeaweedSftpFileWriter) Close() error { return err } - // Stream the file instead of loading it - return w.fs.putFile(w.req.Filepath, w.tmpFile, w.fs.user) + // Stream the file to the absolute path (after HomeDir translation) + return w.fs.putFile(w.absPath, w.tmpFile, w.fs.user) } diff --git a/weed/sftpd/sftp_filer.go b/weed/sftpd/sftp_filer.go index 9baaf41d7..eb196cc28 100644 --- a/weed/sftpd/sftp_filer.go +++ b/weed/sftpd/sftp_filer.go @@ -100,18 +100,26 @@ func (fs *SftpServer) withTimeoutContext(fn func(ctx context.Context) error) err // ==================== Command Dispatcher ==================== func (fs *SftpServer) dispatchCmd(r *sftp.Request) error { - glog.V(0).Infof("Dispatch: %s %s", r.Method, r.Filepath) + absPath, err := fs.toAbsolutePath(r.Filepath) + if err != nil { + return err + } + glog.V(1).Infof("Dispatch: %s %s (absolute: %s)", r.Method, r.Filepath, absPath) switch r.Method { case "Remove": - return fs.removeEntry(r) + return fs.removeEntry(absPath) case "Rename": - return fs.renameEntry(r) + absTarget, err := fs.toAbsolutePath(r.Target) + if err != nil { + return err + } + return fs.renameEntry(absPath, absTarget) case "Mkdir": - return fs.makeDir(r) + return fs.makeDir(absPath) case "Rmdir": - return fs.removeDir(r) + return fs.removeDir(absPath) case "Setstat": - return fs.setFileStat(r) + return fs.setFileStatWithRequest(absPath, r) default: return fmt.Errorf("unsupported: %s", r.Method) } @@ -120,10 +128,14 @@ func (fs *SftpServer) dispatchCmd(r *sftp.Request) error { // ==================== File Operations ==================== func (fs *SftpServer) readFile(r *sftp.Request) (io.ReaderAt, error) { - if err := fs.checkFilePermission(r.Filepath, "read"); err != nil { + absPath, err := fs.toAbsolutePath(r.Filepath) + if err != nil { + return nil, err + } + if err := fs.checkFilePermission(absPath, "read"); err != nil { return nil, err } - entry, err := fs.getEntry(r.Filepath) + entry, err := fs.getEntry(absPath) if err != nil { return nil, err } @@ -131,7 +143,11 @@ func (fs *SftpServer) readFile(r *sftp.Request) (io.ReaderAt, error) { } func (fs *SftpServer) newFileWriter(r *sftp.Request) (io.WriterAt, error) { - dir, _ := util.FullPath(r.Filepath).DirAndName() + absPath, err := fs.toAbsolutePath(r.Filepath) + if err != nil { + return nil, err + } + dir, _ := util.FullPath(absPath).DirAndName() if err := fs.checkFilePermission(dir, "write"); err != nil { glog.Errorf("Permission denied for %s", dir) return nil, err @@ -145,6 +161,7 @@ func (fs *SftpServer) newFileWriter(r *sftp.Request) (io.WriterAt, error) { return &SeaweedSftpFileWriter{ fs: *fs, req: r, + absPath: absPath, tmpFile: tmpFile, permissions: 0644, uid: fs.user.Uid, @@ -153,16 +170,20 @@ func (fs *SftpServer) newFileWriter(r *sftp.Request) (io.WriterAt, error) { }, nil } -func (fs *SftpServer) removeEntry(r *sftp.Request) error { - return fs.deleteEntry(r.Filepath, false) +func (fs *SftpServer) removeEntry(absPath string) error { + return fs.deleteEntry(absPath, false) } -func (fs *SftpServer) renameEntry(r *sftp.Request) error { - if err := fs.checkFilePermission(r.Filepath, "rename"); err != nil { +func (fs *SftpServer) renameEntry(absPath, absTarget string) error { + if err := fs.checkFilePermission(absPath, "rename"); err != nil { + return err + } + targetDir, _ := util.FullPath(absTarget).DirAndName() + if err := fs.checkFilePermission(targetDir, "write"); err != nil { return err } - oldDir, oldName := util.FullPath(r.Filepath).DirAndName() - newDir, newName := util.FullPath(r.Target).DirAndName() + oldDir, oldName := util.FullPath(absPath).DirAndName() + newDir, newName := util.FullPath(absTarget).DirAndName() return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { _, err := client.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, OldName: oldName, @@ -172,15 +193,15 @@ func (fs *SftpServer) renameEntry(r *sftp.Request) error { }) } -func (fs *SftpServer) setFileStat(r *sftp.Request) error { - if err := fs.checkFilePermission(r.Filepath, "write"); err != nil { +func (fs *SftpServer) setFileStatWithRequest(absPath string, r *sftp.Request) error { + if err := fs.checkFilePermission(absPath, "write"); err != nil { return err } - entry, err := fs.getEntry(r.Filepath) + entry, err := fs.getEntry(absPath) if err != nil { return err } - dir, _ := util.FullPath(r.Filepath).DirAndName() + dir, _ := util.FullPath(absPath).DirAndName() // apply attrs if r.AttrFlags().Permissions { entry.Attributes.FileMode = uint32(r.Attributes().FileMode()) @@ -201,18 +222,22 @@ func (fs *SftpServer) setFileStat(r *sftp.Request) error { // ==================== Directory Operations ==================== func (fs *SftpServer) listDir(r *sftp.Request) (sftp.ListerAt, error) { - if err := fs.checkFilePermission(r.Filepath, "list"); err != nil { + absPath, err := fs.toAbsolutePath(r.Filepath) + if err != nil { + return nil, err + } + if err := fs.checkFilePermission(absPath, "list"); err != nil { return nil, err } if r.Method == "Stat" || r.Method == "Lstat" { - entry, err := fs.getEntry(r.Filepath) + entry, err := fs.getEntry(absPath) if err != nil { return nil, err } fi := &EnhancedFileInfo{FileInfo: FileInfoFromEntry(entry), uid: entry.Attributes.Uid, gid: entry.Attributes.Gid} return listerat([]os.FileInfo{fi}), nil } - return fs.listAllPages(r.Filepath) + return fs.listAllPages(absPath) } func (fs *SftpServer) listAllPages(dirPath string) (sftp.ListerAt, error) { @@ -259,18 +284,19 @@ func (fs *SftpServer) fetchDirectoryPage(dirPath, start string) ([]os.FileInfo, } // makeDir creates a new directory with proper permissions. -func (fs *SftpServer) makeDir(r *sftp.Request) error { +func (fs *SftpServer) makeDir(absPath string) error { if fs.user == nil { return fmt.Errorf("cannot create directory: no user info") } - dir, name := util.FullPath(r.Filepath).DirAndName() - if err := fs.checkFilePermission(r.Filepath, "mkdir"); err != nil { + dir, name := util.FullPath(absPath).DirAndName() + if err := fs.checkFilePermission(dir, "write"); err != nil { return err } // default mode and ownership err := filer_pb.Mkdir(context.Background(), fs, string(dir), name, func(entry *filer_pb.Entry) { mode := uint32(0755 | os.ModeDir) - if strings.HasPrefix(r.Filepath, fs.user.HomeDir) { + // Defensive check: all paths should be under HomeDir after toAbsolutePath translation + if absPath == fs.user.HomeDir || strings.HasPrefix(absPath, fs.user.HomeDir+"/") { mode = uint32(0700 | os.ModeDir) } entry.Attributes.FileMode = mode @@ -288,8 +314,8 @@ func (fs *SftpServer) makeDir(r *sftp.Request) error { } // removeDir deletes a directory. -func (fs *SftpServer) removeDir(r *sftp.Request) error { - return fs.deleteEntry(r.Filepath, false) +func (fs *SftpServer) removeDir(absPath string) error { + return fs.deleteEntry(absPath, false) } func (fs *SftpServer) putFile(filepath string, reader io.Reader, user *user.User) error { diff --git a/weed/sftpd/sftp_server.go b/weed/sftpd/sftp_server.go index f158aeb64..e53098e6b 100644 --- a/weed/sftpd/sftp_server.go +++ b/weed/sftpd/sftp_server.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "os" + "path" + "strings" "time" "github.com/pkg/sftp" @@ -37,6 +39,28 @@ func NewSftpServer(filerAddr pb.ServerAddress, grpcDialOption grpc.DialOption, d } } +// toAbsolutePath translates a user-relative path to an absolute filer path. +// When a user has HomeDir="/sftp/user", their view of "/" maps to "/sftp/user". +// This implements chroot-like behavior where the user's home directory +// becomes their root. +func (fs *SftpServer) toAbsolutePath(userPath string) (string, error) { + // If user has root as home directory, no translation needed + if fs.user.HomeDir == "" || fs.user.HomeDir == "/" { + return path.Clean(userPath), nil + } + + // Concatenate home directory with user path, then clean to resolve any ".." components + p := path.Join(fs.user.HomeDir, strings.TrimPrefix(userPath, "/")) + + // Security check: ensure the final path is within the home directory. + // This prevents path traversal attacks like `../..` that could escape the chroot jail. + if !strings.HasPrefix(p, fs.user.HomeDir+"/") && p != fs.user.HomeDir { + return "", fmt.Errorf("path traversal attempt: %s resolves to %s which is outside home dir %s", userPath, p, fs.user.HomeDir) + } + + return p, nil +} + // Fileread is invoked for “get” requests. func (fs *SftpServer) Fileread(req *sftp.Request) (io.ReaderAt, error) { return fs.readFile(req) diff --git a/weed/sftpd/sftp_server_test.go b/weed/sftpd/sftp_server_test.go new file mode 100644 index 000000000..0af94ca14 --- /dev/null +++ b/weed/sftpd/sftp_server_test.go @@ -0,0 +1,103 @@ +package sftpd + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/sftpd/user" + "github.com/stretchr/testify/assert" +) + +func stringPtr(s string) *string { + return &s +} + +func TestToAbsolutePath(t *testing.T) { + tests := []struct { + name string + homeDir *string // Use pointer to distinguish between unset and empty + userPath string + expected string + expectError bool + }{ + { + name: "normal path", + userPath: "/foo.txt", + expected: "/sftp/testuser/foo.txt", + }, + { + name: "root path", + userPath: "/", + expected: "/sftp/testuser", + }, + { + name: "path with dot", + userPath: "/./foo.txt", + expected: "/sftp/testuser/foo.txt", + }, + { + name: "path traversal attempts", + userPath: "/../foo.txt", + expectError: true, + }, + { + name: "path traversal attempts 2", + userPath: "../../foo.txt", + expectError: true, + }, + { + name: "path traversal attempts 3", + userPath: "/subdir/../../foo.txt", + expectError: true, + }, + { + name: "empty path", + userPath: "", + expected: "/sftp/testuser", + }, + { + name: "multiple slashes", + userPath: "//foo.txt", + expected: "/sftp/testuser/foo.txt", + }, + { + name: "trailing slash", + userPath: "/foo/", + expected: "/sftp/testuser/foo", + }, + { + name: "empty HomeDir passthrough", + homeDir: stringPtr(""), + userPath: "/foo.txt", + expected: "/foo.txt", + }, + { + name: "root HomeDir passthrough", + homeDir: stringPtr("/"), + userPath: "/foo.txt", + expected: "/foo.txt", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + homeDir := "/sftp/testuser" // default + if tt.homeDir != nil { + homeDir = *tt.homeDir + } + + fs := &SftpServer{ + user: &user.User{ + HomeDir: homeDir, + }, + } + + got, err := fs.toAbsolutePath(tt.userPath) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, got) + } + }) + } +} diff --git a/weed/sftpd/sftp_service.go b/weed/sftpd/sftp_service.go index e50bd87ba..4d21815a9 100644 --- a/weed/sftpd/sftp_service.go +++ b/weed/sftpd/sftp_service.go @@ -284,8 +284,8 @@ func (s *SFTPService) handleChannel(newChannel ssh.NewChannel, fs *SftpServer) { // handleSFTP starts the SFTP server on the SSH channel. func (s *SFTPService) handleSFTP(channel ssh.Channel, fs *SftpServer) { - // Create server options with initial working directory set to user's home - serverOptions := sftp.WithStartDirectory(fs.user.HomeDir) + // Start at virtual root "/" - toAbsolutePath translates this to the user's HomeDir + serverOptions := sftp.WithStartDirectory("/") server := sftp.NewRequestServer(channel, sftp.Handlers{ FileGet: fs, FilePut: fs, diff --git a/weed/sftpd/user/filestore.go b/weed/sftpd/user/filestore.go index c522a388a..4c372aa76 100644 --- a/weed/sftpd/user/filestore.go +++ b/weed/sftpd/user/filestore.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "path" "sync" "golang.org/x/crypto/ssh" @@ -99,6 +100,10 @@ func (s *FileStore) loadUsers() error { user.PublicKeys[i] = string(pubKey.Marshal()) } } + // Clean HomeDir to handle trailing slashes and normalize path + if user.HomeDir != "" { + user.HomeDir = path.Clean(user.HomeDir) + } s.users[user.Username] = user } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index f059b4e74..f2cc581da 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -26,12 +26,25 @@ type DataCenterId string type EcNodeId string type RackId string +// EcDisk represents a single disk on a volume server +type EcDisk struct { + diskId uint32 + diskType string + freeEcSlots int + ecShardCount int // Total EC shards on this disk + // Map of volumeId -> shardBits for shards on this disk + ecShards map[needle.VolumeId]erasure_coding.ShardBits +} + type EcNode struct { info *master_pb.DataNodeInfo dc DataCenterId rack RackId freeEcSlot int + // disks maps diskId -> EcDisk for disk-level balancing + disks map[uint32]*EcDisk } + type CandidateEcNode struct { ecNode *EcNode shardCount int @@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol return collections } -func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) { if !commandEnv.isLocked() { return fmt.Errorf("lock is lost") @@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info) // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId) if err != nil { return err } @@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, return err } - fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + if destDiskId > 0 { + fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId) + } else { + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + } } @@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer *EcNode, shardIdsToCopy []uint32, - volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) @@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, CopyEcjFile: true, CopyVifFile: true, SourceDataNode: string(existingLocation), + DiskId: destDiskId, }) if copyErr != nil { return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr) @@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter } freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) - ecNodes = append(ecNodes, &EcNode{ + ecNode := &EcNode{ info: dn, dc: dc, rack: rack, freeEcSlot: int(freeEcSlots), - }) + disks: make(map[uint32]*EcDisk), + } + + // Build disk-level information from volumes and EC shards + // First, discover all unique disk IDs from VolumeInfos (includes empty disks) + allDiskIds := make(map[uint32]string) // diskId -> diskType + for diskType, diskInfo := range dn.DiskInfos { + if diskInfo == nil { + continue + } + // Get all disk IDs from volumes + for _, vi := range diskInfo.VolumeInfos { + allDiskIds[vi.DiskId] = diskType + } + // Also get disk IDs from EC shards + for _, ecShardInfo := range diskInfo.EcShardInfos { + allDiskIds[ecShardInfo.DiskId] = diskType + } + } + + // Group EC shards by disk_id + diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits) + for _, diskInfo := range dn.DiskInfos { + if diskInfo == nil { + continue + } + for _, ecShardInfo := range diskInfo.EcShardInfos { + diskId := ecShardInfo.DiskId + if diskShards[diskId] == nil { + diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits) + } + vid := needle.VolumeId(ecShardInfo.Id) + diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } + } + + // Create EcDisk for each discovered disk + diskCount := len(allDiskIds) + if diskCount == 0 { + diskCount = 1 + } + freePerDisk := int(freeEcSlots) / diskCount + + for diskId, diskType := range allDiskIds { + shards := diskShards[diskId] + if shards == nil { + shards = make(map[needle.VolumeId]erasure_coding.ShardBits) + } + totalShardCount := 0 + for _, shardBits := range shards { + totalShardCount += shardBits.ShardIdCount() + } + + ecNode.disks[diskId] = &EcDisk{ + diskId: diskId, + diskType: diskType, + freeEcSlots: freePerDisk, + ecShardCount: totalShardCount, + ecShards: shards, + } + } + + ecNodes = append(ecNodes, ecNode) totalFreeEcSlots += freeEcSlots }) return @@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error { for _, shards := range fullDiskInfo.EcShardInfos { if _, found := emptyNodeIds[shards.Id]; !found { for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { + vid := needle.VolumeId(shards.Id) + destDiskId := pickBestDiskOnNode(emptyNode, vid) - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + if destDiskId > 0 { + fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + } - err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing) + err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing) if err != nil { return err } @@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi if len(targets) == 0 { return nil, errors.New(details) } + + // When multiple nodes have the same shard count, prefer nodes with better disk distribution + // (i.e., nodes with more disks that have fewer shards of this volume) + if len(targets) > 1 { + slices.SortFunc(targets, func(a, b *EcNode) int { + aScore := diskDistributionScore(a, vid) + bScore := diskDistributionScore(b, vid) + return aScore - bScore // Lower score is better + }) + return targets[0], nil + } + return targets[rand.IntN(len(targets))], nil } +// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks +// Lower score is better (means more room for balanced distribution) +func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int { + if len(ecNode.disks) == 0 { + return 0 + } + + // Sum the existing shard count for this volume on each disk + // Lower total means more room for new shards + score := 0 + for _, disk := range ecNode.disks { + if shardBits, ok := disk.ecShards[vid]; ok { + score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily + } + score += disk.ecShardCount // Also consider total shards on disk + } + return score +} + +// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard +// It prefers disks with fewer shards and more free slots +func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 { + if len(ecNode.disks) == 0 { + return 0 // No disk info available, let the server decide + } + + var bestDiskId uint32 + bestScore := -1 + + for diskId, disk := range ecNode.disks { + if disk.freeEcSlots <= 0 { + continue + } + + // Check if this volume already has shards on this disk + existingShards := 0 + if shardBits, ok := disk.ecShards[vid]; ok { + existingShards = shardBits.ShardIdCount() + } + + // Score: prefer disks with fewer total shards and fewer shards of this volume + // Lower score is better + score := disk.ecShardCount*10 + existingShards*100 + + if bestScore == -1 || score < bestScore { + bestScore = score + bestDiskId = diskId + } + } + + return bestDiskId +} + +// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk +func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) { + node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations) + if err != nil { + return nil, 0, err + } + + diskId := pickBestDiskOnNode(node, vid) + return node, diskId, nil +} + func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error { - destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) + destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes) if err != nil { - fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error()) + fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error()) return nil } - fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) - return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing) + if destDiskId > 0 { + fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId) + } else { + fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id) + } + return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing) } func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index d7b015979..4d775000f 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -10,6 +10,8 @@ import ( "math" "math/rand/v2" "net/http" + "strings" + "sync" "time" "slices" @@ -32,6 +34,7 @@ type commandVolumeCheckDisk struct{} type volumeCheckDisk struct { commandEnv *CommandEnv writer io.Writer + writerMu sync.Mutex now time.Time slowMode bool @@ -40,6 +43,8 @@ type volumeCheckDisk struct { syncDeletions bool fixReadOnly bool nonRepairThreshold float64 + + ewg *ErrorWaitGroup } func (c *commandVolumeCheckDisk) Name() string { @@ -59,9 +64,9 @@ func (c *commandVolumeCheckDisk) Help() string { append entries in B and not in A to A optionally, for each non-writable volume replica A - if volume is not full + select a writable volume replica B + if entries in A don't match B prune late volume entries not matching its index file - select a writable volume replica B append missing entries from B into A mark the volume as writable (healthy) @@ -92,6 +97,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") + maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { return nil @@ -115,6 +121,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write syncDeletions: *syncDeletions, fixReadOnly: *fixReadOnly, nonRepairThreshold: *nonRepairThreshold, + + ewg: NewErrorWaitGroup(*maxParallelization), } // collect topology information @@ -137,23 +145,21 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { return err } - if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil { - return err - } + vcd.checkReadOnlyVolumes(volumeReplicas) - return nil + return vcd.ewg.Wait() } // checkWritableVolumes fixes volume replicas which are not read-only. func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { - vcd.write("Pass #1 (writable volumes)\n") + vcd.write("Pass #1 (writable volumes)") for _, replicas := range volumeReplicas { // filter readonly replica var writableReplicas []*VolumeReplica for _, replica := range replicas { if replica.info.ReadOnly { - vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + vcd.write("skipping readonly volume %d on %s", replica.info.Id, replica.location.dataNode.Id) } else { writableReplicas = append(writableReplicas, replica) } @@ -166,16 +172,23 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo a, b := writableReplicas[0], writableReplicas[1] shouldSkip, err := vcd.shouldSkipVolume(a, b) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) + vcd.write("error checking if volume %d should be skipped: %v", a.info.Id, err) // Continue with sync despite error to be safe } else if shouldSkip { // always choose the larger volume to be the source writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) continue } - if err := vcd.syncTwoReplicas(a, b, true); err != nil { - vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + + modified, err := vcd.syncTwoReplicas(a, b, true) + if err != nil { + vcd.write("failed to sync volumes %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + } else { + if modified { + vcd.write("synced %s and %s for volume %d", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id) + } } + // always choose the larger volume to be the source if a.info.FileCount > b.info.FileCount { writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) @@ -204,7 +217,7 @@ func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) er return err } - vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id) + vcd.write("volume %d on %s is now writable", vid, vr.location.dataNode.Id) return nil } @@ -224,15 +237,15 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er return err } - vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id) + vcd.write("volume %d on %s is now read-only", vid, vr.location.dataNode.Id) return nil } -func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { +func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) { if !vcd.fixReadOnly { - return nil + return } - vcd.write("Pass #2 (read-only volumes)\n") + vcd.write("Pass #2 (read-only volumes)") for vid, replicas := range volumeReplicas { roReplicas := []*VolumeReplica{} @@ -246,11 +259,11 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo } } if len(roReplicas) == 0 { - vcd.write("no read-only replicas for volume %d\n", vid) + vcd.write("no read-only replicas for volume %d", vid) continue } if len(rwReplicas) == 0 { - vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid) + vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from", len(roReplicas), vid) continue } @@ -261,35 +274,44 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo skip, err := vcd.shouldSkipVolume(r, source) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err) + vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err) continue } if skip { continue } - // make volume writable... - if err := vcd.makeVolumeWritable(vid, r); err != nil { - return err - } + vcd.ewg.Add(func() error { + // make volume writable... + if err := vcd.makeVolumeWritable(vid, r); err != nil { + return err + } - // ...fix it... - // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. - if err := vcd.syncTwoReplicas(source, r, false); err != nil { - vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) + // ...try to fix it... + // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes... + modified, err := vcd.syncTwoReplicas(source, r, false) + if err != nil { + vcd.write("sync read-only volume %d on %s from %s: %v", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) - // ...or revert it back to read-only, if something went wrong. - if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { - return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { + return fmt.Errorf("failed to revert volume %d on %s to readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + } + return err + } else { + if modified { + vcd.write("volume %d on %s is now synced to %d and writable", vid, r.location.dataNode.Id, source.location.dataNode.Id) + } else { + // ...or restore back to read-only, if no changes were made. + if err := vcd.makeVolumeReadonly(vid, r); err != nil { + return fmt.Errorf("failed to revert volume %d on %s to readonly: %v", vid, r.location.dataNode.Id, err) + } + } } - vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) - return err - } + return nil + }) } } - - return nil } func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { @@ -297,12 +319,15 @@ func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { } func (vcd *volumeCheckDisk) write(format string, a ...any) { - fmt.Fprintf(vcd.writer, format, a...) + vcd.writerMu.Lock() + defer vcd.writerMu.Unlock() + fmt.Fprintf(vcd.writer, strings.TrimRight(format, "\r\n "), a...) + fmt.Fprint(vcd.writer, "\n") } func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { if vcd.verbose { - fmt.Fprintf(vcd.writer, format, a...) + vcd.write(format, a...) } } @@ -388,7 +413,7 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) if doSyncDeletedCount && !eqDeletedFileCount { return false, nil } - vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n", + vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) } else { return false, nil @@ -399,35 +424,39 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) // syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode // is enabled, changes from target are also synced back into the source. -func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (err error) { +// Returns true if source and/or target were modified, false otherwise. +func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (modified bool, err error) { sourceHasChanges, targetHasChanges := true, true const maxIterations = 5 iteration := 0 + modified = false + for (sourceHasChanges || targetHasChanges) && iteration < maxIterations { iteration++ - vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id) + vcd.writeVerbose("sync iteration %d/%d for volume %d", iteration, maxIterations, source.info.Id) prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { - return err + return modified, err } + modified = modified || sourceHasChanges || targetHasChanges // Detect if we're stuck in a loop with no progress if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { - vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", + vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop", source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) - return fmt.Errorf("sync not making progress after %d iterations", iteration) + return modified, fmt.Errorf("sync not making progress after %d iterations", iteration) } } if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) { - vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", + vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention", source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id) - return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) + return modified, fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } - return nil + return modified, nil } // checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source. @@ -512,7 +541,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me return nil }) - vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { @@ -536,7 +565,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me continue } - vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + vcd.writeVerbose("read %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) hasChanges = true if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { @@ -549,7 +578,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) - vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + vcd.writeVerbose("delete %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } deleteResults := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), @@ -604,7 +633,7 @@ func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection s return err } - vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) + vcd.writeVerbose("load collection %s volume %d index size %d from %s ...", collection, volumeId, buf.Len(), volumeServer) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } @@ -616,7 +645,7 @@ func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: volumeId, - Ext: ".idx", + Ext: ext, CompactionRevision: math.MaxUint32, StopOffset: math.MaxInt64, Collection: collection, diff --git a/weed/shell/command_volume_check_disk_test.go b/weed/shell/command_volume_check_disk_test.go index eee9103a8..ec958fbc4 100644 --- a/weed/shell/command_volume_check_disk_test.go +++ b/weed/shell/command_volume_check_disk_test.go @@ -278,14 +278,14 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) { } // Test write method - vcd.write("test %s\n", "message") + vcd.write("test %s", "message") if buf.String() != "test message\n" { t.Errorf("write() output = %q, want %q", buf.String(), "test message\n") } // Test writeVerbose with verbose=true buf.Reset() - vcd.writeVerbose("verbose %d\n", 123) + vcd.writeVerbose("verbose %d", 123) if buf.String() != "verbose 123\n" { t.Errorf("writeVerbose() with verbose=true output = %q, want %q", buf.String(), "verbose 123\n") } @@ -293,7 +293,7 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) { // Test writeVerbose with verbose=false buf.Reset() vcd.verbose = false - vcd.writeVerbose("should not appear\n") + vcd.writeVerbose("should not appear") if buf.String() != "" { t.Errorf("writeVerbose() with verbose=false output = %q, want empty", buf.String()) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 5c1805c89..6135eb3eb 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -197,8 +197,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv if ecShardInfo.Collection != "" { collectionPrefix = ecShardInfo.Collection + "_" } - fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) - err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange) + vid := needle.VolumeId(ecShardInfo.Id) + destDiskId := pickBestDiskOnNode(emptyNode, vid) + if destDiskId > 0 { + fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) + } else { + fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) + } + err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange) if err != nil { return } else { diff --git a/weed/shell/common.go b/weed/shell/common.go index 43571176e..cb2df5828 100644 --- a/weed/shell/common.go +++ b/weed/shell/common.go @@ -2,6 +2,7 @@ package shell import ( "errors" + "fmt" "sync" ) @@ -64,6 +65,13 @@ func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) { }() } +// AddErrorf adds an error to an ErrorWaitGroupTask result, without queueing any goroutines. +func (ewg *ErrorWaitGroup) AddErrorf(format string, a ...interface{}) { + ewg.errorsMu.Lock() + ewg.errors = append(ewg.errors, fmt.Errorf(format, a...)) + ewg.errorsMu.Unlock() +} + // Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them. func (ewg *ErrorWaitGroup) Wait() error { ewg.wg.Wait() diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go new file mode 100644 index 000000000..67e21c1f8 --- /dev/null +++ b/weed/storage/erasure_coding/placement/placement.go @@ -0,0 +1,420 @@ +// Package placement provides consolidated EC shard placement logic used by +// both shell commands and worker tasks. +// +// This package encapsulates the algorithms for: +// - Selecting destination nodes/disks for EC shards +// - Ensuring proper spread across racks, servers, and disks +// - Balancing shards across the cluster +package placement + +import ( + "fmt" + "sort" +) + +// DiskCandidate represents a disk that can receive EC shards +type DiskCandidate struct { + NodeID string + DiskID uint32 + DataCenter string + Rack string + + // Capacity information + VolumeCount int64 + MaxVolumeCount int64 + ShardCount int // Current number of EC shards on this disk + FreeSlots int // Available slots for new shards + + // Load information + LoadCount int // Number of active tasks on this disk +} + +// NodeCandidate represents a server node that can receive EC shards +type NodeCandidate struct { + NodeID string + DataCenter string + Rack string + FreeSlots int + ShardCount int // Total shards across all disks + Disks []*DiskCandidate // All disks on this node +} + +// PlacementRequest configures EC shard placement behavior +type PlacementRequest struct { + // ShardsNeeded is the total number of shards to place + ShardsNeeded int + + // MaxShardsPerServer limits how many shards can be placed on a single server + // 0 means no limit (but prefer spreading when possible) + MaxShardsPerServer int + + // MaxShardsPerRack limits how many shards can be placed in a single rack + // 0 means no limit + MaxShardsPerRack int + + // MaxTaskLoad is the maximum task load count for a disk to be considered + MaxTaskLoad int + + // PreferDifferentServers when true, spreads shards across different servers + // before using multiple disks on the same server + PreferDifferentServers bool + + // PreferDifferentRacks when true, spreads shards across different racks + // before using multiple servers in the same rack + PreferDifferentRacks bool +} + +// DefaultPlacementRequest returns the default placement configuration +func DefaultPlacementRequest() PlacementRequest { + return PlacementRequest{ + ShardsNeeded: 14, + MaxShardsPerServer: 0, + MaxShardsPerRack: 0, + MaxTaskLoad: 5, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } +} + +// PlacementResult contains the selected destinations for EC shards +type PlacementResult struct { + SelectedDisks []*DiskCandidate + + // Statistics + ServersUsed int + RacksUsed int + DCsUsed int + + // Distribution maps + ShardsPerServer map[string]int + ShardsPerRack map[string]int + ShardsPerDC map[string]int +} + +// SelectDestinations selects the best disks for EC shard placement. +// This is the main entry point for EC placement logic. +// +// The algorithm works in multiple passes: +// 1. First pass: Select one disk from each rack (maximize rack diversity) +// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity) +// 3. Third pass: Select additional disks from servers already used (maximize disk diversity) +func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) { + if len(disks) == 0 { + return nil, fmt.Errorf("no disk candidates provided") + } + if config.ShardsNeeded <= 0 { + return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded) + } + + // Filter suitable disks + suitable := filterSuitableDisks(disks, config) + if len(suitable) == 0 { + return nil, fmt.Errorf("no suitable disks found after filtering") + } + + // Build indexes for efficient lookup + rackToDisks := groupDisksByRack(suitable) + + result := &PlacementResult{ + SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded), + ShardsPerServer: make(map[string]int), + ShardsPerRack: make(map[string]int), + ShardsPerDC: make(map[string]int), + } + + usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool + usedServers := make(map[string]bool) // nodeID -> bool + usedRacks := make(map[string]bool) // "dc:rack" -> bool + + // Pass 1: Select one disk from each rack (maximize rack diversity) + if config.PreferDifferentRacks { + // Sort racks by number of available servers (descending) to prioritize racks with more options + sortedRacks := sortRacksByServerCount(rackToDisks) + for _, rackKey := range sortedRacks { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + rackDisks := rackToDisks[rackKey] + // Select best disk from this rack, preferring a new server + disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config) + if disk != nil { + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + } + + // Pass 2: Select disks from unused servers in already-used racks + if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded { + for _, rackKey := range getSortedRackKeys(rackToDisks) { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + rackDisks := rackToDisks[rackKey] + for _, disk := range sortDisksByScore(rackDisks) { + if len(result.SelectedDisks) >= config.ShardsNeeded { + break + } + diskKey := getDiskKey(disk) + if usedDisks[diskKey] { + continue + } + // Skip if server already used (we want different servers in this pass) + if usedServers[disk.NodeID] { + continue + } + // Check server limit + if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer { + continue + } + // Check rack limit + if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { + continue + } + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + } + + // Pass 3: Fill remaining slots from already-used servers (different disks) + // Use round-robin across servers to balance shards evenly + if len(result.SelectedDisks) < config.ShardsNeeded { + // Group remaining disks by server + serverToRemainingDisks := make(map[string][]*DiskCandidate) + for _, disk := range suitable { + if !usedDisks[getDiskKey(disk)] { + serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk) + } + } + + // Sort each server's disks by score + for serverID := range serverToRemainingDisks { + serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID]) + } + + // Round-robin: repeatedly select from the server with the fewest shards + for len(result.SelectedDisks) < config.ShardsNeeded { + // Find server with fewest shards that still has available disks + var bestServer string + minShards := -1 + for serverID, disks := range serverToRemainingDisks { + if len(disks) == 0 { + continue + } + // Check server limit + if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer { + continue + } + shardCount := result.ShardsPerServer[serverID] + if minShards == -1 || shardCount < minShards { + minShards = shardCount + bestServer = serverID + } else if shardCount == minShards && serverID < bestServer { + // Tie-break by server name for determinism + bestServer = serverID + } + } + + if bestServer == "" { + // No more servers with available disks + break + } + + // Pop the best disk from this server + disks := serverToRemainingDisks[bestServer] + disk := disks[0] + serverToRemainingDisks[bestServer] = disks[1:] + + // Check rack limit + if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { + continue + } + + addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) + } + } + + // Calculate final statistics + result.ServersUsed = len(usedServers) + result.RacksUsed = len(usedRacks) + dcSet := make(map[string]bool) + for _, disk := range result.SelectedDisks { + dcSet[disk.DataCenter] = true + } + result.DCsUsed = len(dcSet) + + return result, nil +} + +// filterSuitableDisks filters disks that are suitable for EC placement +func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate { + var suitable []*DiskCandidate + for _, disk := range disks { + if disk.FreeSlots <= 0 { + continue + } + if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad { + continue + } + suitable = append(suitable, disk) + } + return suitable +} + +// groupDisksByRack groups disks by their rack (dc:rack key) +func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate { + result := make(map[string][]*DiskCandidate) + for _, disk := range disks { + key := getRackKey(disk) + result[key] = append(result[key], disk) + } + return result +} + +// groupDisksByServer groups disks by their server +func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate { + result := make(map[string][]*DiskCandidate) + for _, disk := range disks { + result[disk.NodeID] = append(result[disk.NodeID], disk) + } + return result +} + +// getRackKey returns the unique key for a rack (dc:rack) +func getRackKey(disk *DiskCandidate) string { + return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) +} + +// getDiskKey returns the unique key for a disk (nodeID:diskID) +func getDiskKey(disk *DiskCandidate) string { + return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) +} + +// sortRacksByServerCount returns rack keys sorted by number of servers (ascending) +func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string { + // Count unique servers per rack + rackServerCount := make(map[string]int) + for rackKey, disks := range rackToDisks { + servers := make(map[string]bool) + for _, disk := range disks { + servers[disk.NodeID] = true + } + rackServerCount[rackKey] = len(servers) + } + + keys := getSortedRackKeys(rackToDisks) + sort.Slice(keys, func(i, j int) bool { + // Sort by server count (descending) to pick from racks with more options first + return rackServerCount[keys[i]] > rackServerCount[keys[j]] + }) + return keys +} + +// getSortedRackKeys returns rack keys in a deterministic order +func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string { + keys := make([]string, 0, len(rackToDisks)) + for k := range rackToDisks { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// selectBestDiskFromRack selects the best disk from a rack for EC placement +// It prefers servers that haven't been used yet +func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate { + var bestDisk *DiskCandidate + bestScore := -1.0 + bestIsFromUnusedServer := false + + for _, disk := range disks { + if usedDisks[getDiskKey(disk)] { + continue + } + isFromUnusedServer := !usedServers[disk.NodeID] + score := calculateDiskScore(disk) + + // Prefer unused servers + if isFromUnusedServer && !bestIsFromUnusedServer { + bestDisk = disk + bestScore = score + bestIsFromUnusedServer = true + } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore { + bestDisk = disk + bestScore = score + } + } + + return bestDisk +} + +// sortDisksByScore returns disks sorted by score (best first) +func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate { + sorted := make([]*DiskCandidate, len(disks)) + copy(sorted, disks) + sort.Slice(sorted, func(i, j int) bool { + return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j]) + }) + return sorted +} + +// calculateDiskScore calculates a score for a disk candidate +// Higher score is better +func calculateDiskScore(disk *DiskCandidate) float64 { + score := 0.0 + + // Primary factor: available capacity (lower utilization is better) + if disk.MaxVolumeCount > 0 { + utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount) + score += (1.0 - utilization) * 60.0 // Up to 60 points + } else { + score += 30.0 // Default if no max count + } + + // Secondary factor: fewer shards already on this disk is better + score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points + + // Tertiary factor: lower load is better + score += float64(10 - disk.LoadCount) // Up to 10 points + + return score +} + +// addDiskToResult adds a disk to the result and updates tracking maps +func addDiskToResult(result *PlacementResult, disk *DiskCandidate, + usedDisks, usedServers, usedRacks map[string]bool) { + diskKey := getDiskKey(disk) + rackKey := getRackKey(disk) + + result.SelectedDisks = append(result.SelectedDisks, disk) + usedDisks[diskKey] = true + usedServers[disk.NodeID] = true + usedRacks[rackKey] = true + result.ShardsPerServer[disk.NodeID]++ + result.ShardsPerRack[rackKey]++ + result.ShardsPerDC[disk.DataCenter]++ +} + +// VerifySpread checks if the placement result meets diversity requirements +func VerifySpread(result *PlacementResult, minServers, minRacks int) error { + if result.ServersUsed < minServers { + return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers) + } + if result.RacksUsed < minRacks { + return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks) + } + return nil +} + +// CalculateIdealDistribution returns the ideal number of shards per server +// when we have a certain number of shards and servers +func CalculateIdealDistribution(totalShards, numServers int) (min, max int) { + if numServers <= 0 { + return 0, totalShards + } + min = totalShards / numServers + max = min + if totalShards%numServers != 0 { + max = min + 1 + } + return +} diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go new file mode 100644 index 000000000..6cb94a4da --- /dev/null +++ b/weed/storage/erasure_coding/placement/placement_test.go @@ -0,0 +1,517 @@ +package placement + +import ( +"testing" +) + +// Helper function to create disk candidates for testing +func makeDisk(nodeID string, diskID uint32, dc, rack string, freeSlots int) *DiskCandidate { + return &DiskCandidate{ + NodeID: nodeID, + DiskID: diskID, + DataCenter: dc, + Rack: rack, + VolumeCount: 0, + MaxVolumeCount: 100, + ShardCount: 0, + FreeSlots: freeSlots, + LoadCount: 0, + } +} + +func TestSelectDestinations_SingleRack(t *testing.T) { + // Test: 3 servers in same rack, each with 2 disks, need 6 shards + // Expected: Should spread across all 6 disks (one per disk) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 3 servers are used + if result.ServersUsed != 3 { + t.Errorf("expected 3 servers used, got %d", result.ServersUsed) + } + + // Verify each disk is unique + diskSet := make(map[string]bool) + for _, disk := range result.SelectedDisks { + key := getDiskKey(disk) + if diskSet[key] { + t.Errorf("disk %s selected multiple times", key) + } + diskSet[key] = true + } +} + +func TestSelectDestinations_MultipleRacks(t *testing.T) { + // Test: 2 racks with 2 servers each, each server has 2 disks + // Need 8 shards + // Expected: Should spread across all 8 disks + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack2", 10), + makeDisk("server3", 1, "dc1", "rack2", 10), + makeDisk("server4", 0, "dc1", "rack2", 10), + makeDisk("server4", 1, "dc1", "rack2", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 8, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 8 { + t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 4 servers are used + if result.ServersUsed != 4 { + t.Errorf("expected 4 servers used, got %d", result.ServersUsed) + } + + // Verify both racks are used + if result.RacksUsed != 2 { + t.Errorf("expected 2 racks used, got %d", result.RacksUsed) + } +} + +func TestSelectDestinations_PrefersDifferentServers(t *testing.T) { + // Test: 4 servers with 4 disks each, need 4 shards + // Expected: Should use one disk from each server + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + makeDisk("server3", 2, "dc1", "rack1", 10), + makeDisk("server3", 3, "dc1", "rack1", 10), + makeDisk("server4", 0, "dc1", "rack1", 10), + makeDisk("server4", 1, "dc1", "rack1", 10), + makeDisk("server4", 2, "dc1", "rack1", 10), + makeDisk("server4", 3, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 4, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 4 { + t.Errorf("expected 4 selected disks, got %d", len(result.SelectedDisks)) + } + + // Verify all 4 servers are used (one shard per server) + if result.ServersUsed != 4 { + t.Errorf("expected 4 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 1 shard + for server, count := range result.ShardsPerServer { + if count != 1 { + t.Errorf("server %s has %d shards, expected 1", server, count) + } + } +} + +func TestSelectDestinations_SpilloverToMultipleDisksPerServer(t *testing.T) { + // Test: 2 servers with 4 disks each, need 6 shards + // Expected: First pick one from each server (2 shards), then one more from each (4 shards), + // then fill remaining from any server (6 shards) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // Both servers should be used + if result.ServersUsed != 2 { + t.Errorf("expected 2 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 3 shards (balanced) + for server, count := range result.ShardsPerServer { + if count != 3 { + t.Errorf("server %s has %d shards, expected 3", server, count) + } + } +} + +func TestSelectDestinations_MaxShardsPerServer(t *testing.T) { + // Test: 2 servers with 4 disks each, need 6 shards, max 2 per server + // Expected: Should only select 4 shards (2 per server limit) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server1", 3, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server2", 3, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + MaxShardsPerServer: 2, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only get 4 shards due to server limit + if len(result.SelectedDisks) != 4 { + t.Errorf("expected 4 selected disks (limit 2 per server), got %d", len(result.SelectedDisks)) + } + + // No server should exceed the limit + for server, count := range result.ShardsPerServer { + if count > 2 { + t.Errorf("server %s has %d shards, exceeds limit of 2", server, count) + } + } +} + +func TestSelectDestinations_14ShardsAcross7Servers(t *testing.T) { + // Test: Real-world EC scenario - 14 shards across 7 servers with 2 disks each + // Expected: Should spread evenly (2 shards per server) + var disks []*DiskCandidate + for i := 1; i <= 7; i++ { + serverID := "server" + string(rune('0'+i)) + disks = append(disks, makeDisk(serverID, 0, "dc1", "rack1", 10)) + disks = append(disks, makeDisk(serverID, 1, "dc1", "rack1", 10)) + } + + config := PlacementRequest{ + ShardsNeeded: 14, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 14 { + t.Errorf("expected 14 selected disks, got %d", len(result.SelectedDisks)) + } + + // All 7 servers should be used + if result.ServersUsed != 7 { + t.Errorf("expected 7 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 2 shards + for server, count := range result.ShardsPerServer { + if count != 2 { + t.Errorf("server %s has %d shards, expected 2", server, count) + } + } +} + +func TestSelectDestinations_FewerServersThanShards(t *testing.T) { + // Test: Only 3 servers but need 6 shards + // Expected: Should distribute evenly (2 per server) + disks := []*DiskCandidate{ + makeDisk("server1", 0, "dc1", "rack1", 10), + makeDisk("server1", 1, "dc1", "rack1", 10), + makeDisk("server1", 2, "dc1", "rack1", 10), + makeDisk("server2", 0, "dc1", "rack1", 10), + makeDisk("server2", 1, "dc1", "rack1", 10), + makeDisk("server2", 2, "dc1", "rack1", 10), + makeDisk("server3", 0, "dc1", "rack1", 10), + makeDisk("server3", 1, "dc1", "rack1", 10), + makeDisk("server3", 2, "dc1", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 6, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 6 { + t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks)) + } + + // All 3 servers should be used + if result.ServersUsed != 3 { + t.Errorf("expected 3 servers used, got %d", result.ServersUsed) + } + + // Each server should have exactly 2 shards + for server, count := range result.ShardsPerServer { + if count != 2 { + t.Errorf("server %s has %d shards, expected 2", server, count) + } + } +} + +func TestSelectDestinations_NoSuitableDisks(t *testing.T) { + // Test: All disks have no free slots + disks := []*DiskCandidate{ + {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0}, + {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0}, + } + + config := PlacementRequest{ + ShardsNeeded: 4, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + _, err := SelectDestinations(disks, config) + if err == nil { + t.Error("expected error for no suitable disks, got nil") + } +} + +func TestSelectDestinations_EmptyInput(t *testing.T) { + config := DefaultPlacementRequest() + _, err := SelectDestinations([]*DiskCandidate{}, config) + if err == nil { + t.Error("expected error for empty input, got nil") + } +} + +func TestSelectDestinations_FiltersByLoad(t *testing.T) { + // Test: Some disks have too high load + disks := []*DiskCandidate{ + {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 10}, + {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 2}, + {NodeID: "server3", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 1}, + } + + config := PlacementRequest{ + ShardsNeeded: 2, + MaxTaskLoad: 5, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should only select from server2 and server3 (server1 has too high load) + for _, disk := range result.SelectedDisks { + if disk.NodeID == "server1" { + t.Errorf("disk from server1 should not be selected (load too high)") + } + } +} + +func TestCalculateDiskScore(t *testing.T) { + // Test that score calculation works as expected + lowUtilDisk := &DiskCandidate{ + VolumeCount: 10, + MaxVolumeCount: 100, + ShardCount: 0, + LoadCount: 0, + } + + highUtilDisk := &DiskCandidate{ + VolumeCount: 90, + MaxVolumeCount: 100, + ShardCount: 5, + LoadCount: 5, + } + + lowScore := calculateDiskScore(lowUtilDisk) + highScore := calculateDiskScore(highUtilDisk) + + if lowScore <= highScore { + t.Errorf("low utilization disk should have higher score: low=%f, high=%f", lowScore, highScore) + } +} + +func TestCalculateIdealDistribution(t *testing.T) { + tests := []struct { + totalShards int + numServers int + expectedMin int + expectedMax int + }{ + {14, 7, 2, 2}, // Even distribution + {14, 4, 3, 4}, // Uneven: 14/4 = 3 remainder 2 + {6, 3, 2, 2}, // Even distribution + {7, 3, 2, 3}, // Uneven: 7/3 = 2 remainder 1 + {10, 0, 0, 10}, // Edge case: no servers + {0, 5, 0, 0}, // Edge case: no shards + } + + for _, tt := range tests { + min, max := CalculateIdealDistribution(tt.totalShards, tt.numServers) + if min != tt.expectedMin || max != tt.expectedMax { + t.Errorf("CalculateIdealDistribution(%d, %d) = (%d, %d), want (%d, %d)", +tt.totalShards, tt.numServers, min, max, tt.expectedMin, tt.expectedMax) + } + } +} + +func TestVerifySpread(t *testing.T) { + result := &PlacementResult{ + ServersUsed: 3, + RacksUsed: 2, + } + + // Should pass + if err := VerifySpread(result, 3, 2); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // Should fail - not enough servers + if err := VerifySpread(result, 4, 2); err == nil { + t.Error("expected error for insufficient servers") + } + + // Should fail - not enough racks + if err := VerifySpread(result, 3, 3); err == nil { + t.Error("expected error for insufficient racks") + } +} + +func TestSelectDestinations_MultiDC(t *testing.T) { + // Test: 2 DCs, each with 2 racks, each rack has 2 servers + disks := []*DiskCandidate{ + // DC1, Rack1 + makeDisk("dc1-r1-s1", 0, "dc1", "rack1", 10), + makeDisk("dc1-r1-s1", 1, "dc1", "rack1", 10), + makeDisk("dc1-r1-s2", 0, "dc1", "rack1", 10), + makeDisk("dc1-r1-s2", 1, "dc1", "rack1", 10), + // DC1, Rack2 + makeDisk("dc1-r2-s1", 0, "dc1", "rack2", 10), + makeDisk("dc1-r2-s1", 1, "dc1", "rack2", 10), + makeDisk("dc1-r2-s2", 0, "dc1", "rack2", 10), + makeDisk("dc1-r2-s2", 1, "dc1", "rack2", 10), + // DC2, Rack1 + makeDisk("dc2-r1-s1", 0, "dc2", "rack1", 10), + makeDisk("dc2-r1-s1", 1, "dc2", "rack1", 10), + makeDisk("dc2-r1-s2", 0, "dc2", "rack1", 10), + makeDisk("dc2-r1-s2", 1, "dc2", "rack1", 10), + // DC2, Rack2 + makeDisk("dc2-r2-s1", 0, "dc2", "rack2", 10), + makeDisk("dc2-r2-s1", 1, "dc2", "rack2", 10), + makeDisk("dc2-r2-s2", 0, "dc2", "rack2", 10), + makeDisk("dc2-r2-s2", 1, "dc2", "rack2", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 8, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.SelectedDisks) != 8 { + t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks)) + } + + // Should use all 4 racks + if result.RacksUsed != 4 { + t.Errorf("expected 4 racks used, got %d", result.RacksUsed) + } + + // Should use both DCs + if result.DCsUsed != 2 { + t.Errorf("expected 2 DCs used, got %d", result.DCsUsed) + } +} + +func TestSelectDestinations_SameRackDifferentDC(t *testing.T) { + // Test: Same rack name in different DCs should be treated as different racks + disks := []*DiskCandidate{ + makeDisk("dc1-s1", 0, "dc1", "rack1", 10), + makeDisk("dc2-s1", 0, "dc2", "rack1", 10), + } + + config := PlacementRequest{ + ShardsNeeded: 2, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } + + result, err := SelectDestinations(disks, config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Should use 2 racks (dc1:rack1 and dc2:rack1 are different) + if result.RacksUsed != 2 { + t.Errorf("expected 2 racks used (different DCs), got %d", result.RacksUsed) + } +} diff --git a/weed/storage/store.go b/weed/storage/store.go index cc07f8702..7a336d1ff 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -63,6 +63,7 @@ type Store struct { Port int GrpcPort int PublicUrl string + Id string // volume server id, independent of ip:port for stable identification Locations []*DiskLocation dataCenter string // optional information, overwriting master setting if exists rack string // optional information, overwriting master setting if exists @@ -76,13 +77,13 @@ type Store struct { } func (s *Store) String() (str string) { - str = fmt.Sprintf("Ip:%s, Port:%d, GrpcPort:%d PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.GrpcPort, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit()) + str = fmt.Sprintf("Id:%s, Ip:%s, Port:%d, GrpcPort:%d PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Id, s.Ip, s.Port, s.GrpcPort, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit()) return } -func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int32, +func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, id string, dirnames []string, maxVolumeCounts []int32, minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType, ldbTimeout int64) (s *Store) { - s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} + s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, Id: id, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) var wg sync.WaitGroup @@ -414,6 +415,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { Port: uint32(s.Port), GrpcPort: uint32(s.GrpcPort), PublicUrl: s.PublicUrl, + Id: s.Id, MaxVolumeCounts: maxVolumeCounts, MaxFileKey: NeedleIdToUint64(maxFileKey), DataCenter: s.dataCenter, @@ -467,6 +469,10 @@ func (s *Store) SetStopping() { } } +func (s *Store) IsStopping() bool { + return s.isStopping +} + func (s *Store) LoadNewVolumes() { for _, location := range s.Locations { location.loadExistingVolumes(s.NeedleMapKind, 0) diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index a3e028bbb..9fcb092a2 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -21,7 +22,8 @@ func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle return 0, err } - if cookie != n.Cookie { + // cookie == 0 indicates SkipCookieCheck was requested (e.g., orphan cleanup) + if cookie != 0 && cookie != n.Cookie { return 0, fmt.Errorf("unexpected cookie %x", cookie) } @@ -45,22 +47,17 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_cod shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) - hasDeletionSuccess := false err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId) if err == nil { - hasDeletionSuccess = true + return nil } for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ { if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil { - hasDeletionSuccess = true + return nil } } - if hasDeletionSuccess { - return nil - } - return err } @@ -77,11 +74,9 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.Sh for _, sourceDataNode := range sourceDataNodes { glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode) - err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId) - if err != nil { + if err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId); err != nil { return err } - glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err) } return nil diff --git a/weed/storage/store_load_balancing_test.go b/weed/storage/store_load_balancing_test.go index 15e709d53..35475a6ae 100644 --- a/weed/storage/store_load_balancing_test.go +++ b/weed/storage/store_load_balancing_test.go @@ -31,7 +31,7 @@ func newTestStore(t *testing.T, numDirs int) *Store { diskTypes = append(diskTypes, types.HardDriveType) } - store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "", dirs, maxCounts, minFreeSpaces, "", NeedleMapInMemory, diskTypes, 3) // Consume channel messages to prevent blocking diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 4f2dbe464..07e00ac0a 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -269,6 +269,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { Id: string(dn.Id()), DiskInfos: make(map[string]*master_pb.DiskInfo), GrpcPort: uint32(dn.GrpcPort), + Address: dn.Url(), // ip:port for connecting to the volume server } for _, c := range dn.Children() { disk := c.(*Disk) diff --git a/weed/topology/rack.go b/weed/topology/rack.go index f526cd84d..1e5c8b632 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -5,6 +5,7 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" @@ -34,17 +35,73 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { + +// FindDataNodeById finds a DataNode by its ID using O(1) map lookup +func (r *Rack) FindDataNodeById(id string) *DataNode { + r.RLock() + defer r.RUnlock() + if c, ok := r.children[NodeId(id)]; ok { + return c.(*DataNode) + } + return nil +} + +func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode { r.Lock() defer r.Unlock() - for _, c := range r.children { + + // Normalize the id parameter (trim whitespace) + id = strings.TrimSpace(id) + + // Determine the node ID: use provided id, or fall back to ip:port for backward compatibility + nodeId := util.GetVolumeServerId(id, ip, port) + + // First, try to find by node ID using O(1) map lookup (stable identity) + if c, ok := r.children[NodeId(nodeId)]; ok { dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - return dn + // Log if IP or Port changed (e.g., pod rescheduled in K8s) + if dn.Ip != ip || dn.Port != port { + glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port) } + // Update the IP/Port in case they changed + dn.Ip = ip + dn.Port = port + dn.GrpcPort = grpcPort + dn.PublicUrl = publicUrl + dn.LastSeen = time.Now().Unix() + return dn } - dn := NewDataNode(util.JoinHostPort(ip, port)) + + // For backward compatibility: if explicit id was provided, also check by ip:port + // to handle transition from old (ip:port) to new (explicit id) behavior + ipPortId := util.JoinHostPort(ip, port) + if nodeId != ipPortId { + for oldId, c := range r.children { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + // Only transition if the oldId exactly matches ip:port (legacy identification). + // If oldId is different, this is a node with an explicit id that happens to + // reuse the same ip:port - don't incorrectly merge them. + if string(oldId) != ipPortId { + glog.Warningf("Volume server with id %s has ip:port %s which is used by node %s", nodeId, ipPortId, oldId) + continue + } + // Found a legacy node identified by ip:port, transition it to use the new explicit id + glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, nodeId) + // Re-key the node in the children map with the new id + delete(r.children, oldId) + dn.id = NodeId(nodeId) + r.children[NodeId(nodeId)] = dn + // Update connection info in case they changed + dn.GrpcPort = grpcPort + dn.PublicUrl = publicUrl + dn.LastSeen = time.Now().Unix() + return dn + } + } + } + + dn := NewDataNode(nodeId) dn.Ip = ip dn.Port = port dn.GrpcPort = grpcPort diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 8515d2f81..e5a8969fc 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -34,7 +34,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) { volumeCount := 7 @@ -180,7 +180,7 @@ func TestAddRemoveVolume(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) v := storage.VolumeInfo{ Id: needle.VolumeId(1), @@ -218,7 +218,7 @@ func TestVolumeReadOnlyStatusChange(t *testing.T) { rack := dc.GetOrCreateRack("rack1") maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) // Create a writable volume v := storage.VolumeInfo{ @@ -267,7 +267,7 @@ func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) { rack := dc.GetOrCreateRack("rack1") maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) // Create a writable, local volume v := storage.VolumeInfo{ @@ -331,7 +331,7 @@ func TestListCollections(t *testing.T) { topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil) topo.RegisterVolumeLayout(storage.VolumeInfo{ Id: needle.VolumeId(1111), @@ -396,3 +396,112 @@ func TestListCollections(t *testing.T) { }) } } + +func TestDataNodeIdBasedIdentification(t *testing.T) { + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + + maxVolumeCounts := make(map[string]uint32) + maxVolumeCounts[""] = 10 + + // Test 1: Create a DataNode with explicit id + dn1 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-1", maxVolumeCounts) + if string(dn1.Id()) != "node-1" { + t.Errorf("expected node id 'node-1', got '%s'", dn1.Id()) + } + if dn1.Ip != "10.0.0.1" { + t.Errorf("expected ip '10.0.0.1', got '%s'", dn1.Ip) + } + + // Test 2: Same id with different IP should return the same DataNode (K8s pod reschedule scenario) + dn2 := rack.GetOrCreateDataNode("10.0.0.2", 8080, 18080, "10.0.0.2:8080", "node-1", maxVolumeCounts) + if dn1 != dn2 { + t.Errorf("expected same DataNode for same id, got different nodes") + } + // IP should be updated to the new value + if dn2.Ip != "10.0.0.2" { + t.Errorf("expected ip to be updated to '10.0.0.2', got '%s'", dn2.Ip) + } + if dn2.PublicUrl != "10.0.0.2:8080" { + t.Errorf("expected publicUrl to be updated to '10.0.0.2:8080', got '%s'", dn2.PublicUrl) + } + + // Test 3: Different id should create a new DataNode + dn3 := rack.GetOrCreateDataNode("10.0.0.3", 8080, 18080, "10.0.0.3:8080", "node-2", maxVolumeCounts) + if string(dn3.Id()) != "node-2" { + t.Errorf("expected node id 'node-2', got '%s'", dn3.Id()) + } + if dn1 == dn3 { + t.Errorf("expected different DataNode for different id") + } + + // Test 4: Empty id should fall back to ip:port (backward compatibility) + dn4 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts) + if string(dn4.Id()) != "10.0.0.4:8080" { + t.Errorf("expected node id '10.0.0.4:8080' for empty id, got '%s'", dn4.Id()) + } + + // Test 5: Same ip:port with empty id should return the same DataNode + dn5 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts) + if dn4 != dn5 { + t.Errorf("expected same DataNode for same ip:port with empty id") + } + + // Verify we have 3 unique DataNodes total: + // - node-1 (dn1/dn2 share the same id) + // - node-2 (dn3) + // - 10.0.0.4:8080 (dn4/dn5 share the same ip:port) + children := rack.Children() + if len(children) != 3 { + t.Errorf("expected 3 DataNodes, got %d", len(children)) + } + + // Test 6: Transition from ip:port to explicit id + // First, the node exists with ip:port as id (dn4/dn5) + // Now the same volume server starts sending an explicit id + dn6 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "node-4-explicit", maxVolumeCounts) + // Should return the same DataNode instance + if dn6 != dn4 { + t.Errorf("expected same DataNode instance during transition") + } + // But the id should now be updated to the explicit id + if string(dn6.Id()) != "node-4-explicit" { + t.Errorf("expected node id to transition to 'node-4-explicit', got '%s'", dn6.Id()) + } + // The node should be re-keyed in the children map + if rack.FindDataNodeById("node-4-explicit") != dn6 { + t.Errorf("expected to find DataNode by new explicit id") + } + // Old ip:port key should no longer work + if rack.FindDataNodeById("10.0.0.4:8080") != nil { + t.Errorf("expected old ip:port id to be removed from children map") + } + + // Still 3 unique DataNodes (node-1, node-2, node-4-explicit) + children = rack.Children() + if len(children) != 3 { + t.Errorf("expected 3 DataNodes after transition, got %d", len(children)) + } + + // Test 7: Prevent incorrect transition when a new node reuses ip:port of a node with explicit id + // Scenario: node-1 runs at 10.0.0.1:8080, dies, new node-99 starts at same ip:port + // The transition should NOT happen because node-1 already has an explicit id + dn7 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-99", maxVolumeCounts) + // Should create a NEW DataNode, not reuse node-1 + if dn7 == dn1 { + t.Errorf("expected new DataNode for node-99, got reused node-1") + } + if string(dn7.Id()) != "node-99" { + t.Errorf("expected node id 'node-99', got '%s'", dn7.Id()) + } + // node-1 should still exist with its original id + if rack.FindDataNodeById("node-1") == nil { + t.Errorf("node-1 should still exist") + } + // Now we have 4 DataNodes + children = rack.Children() + if len(children) != 4 { + t.Errorf("expected 4 DataNodes, got %d", len(children)) + } +} diff --git a/weed/util/network.go b/weed/util/network.go index 328808dbc..f7dbeebb7 100644 --- a/weed/util/network.go +++ b/weed/util/network.go @@ -64,3 +64,14 @@ func JoinHostPort(host string, port int) string { } return net.JoinHostPort(host, portStr) } + +// GetVolumeServerId returns the volume server ID. +// If id is provided (non-empty after trimming), use it as the identifier. +// Otherwise, fall back to ip:port for backward compatibility. +func GetVolumeServerId(id, ip string, port int) string { + volumeServerId := strings.TrimSpace(id) + if volumeServerId == "" { + volumeServerId = JoinHostPort(ip, port) + } + return volumeServerId +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index cd74bed33..c5568fe26 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -429,85 +430,100 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era } // selectBestECDestinations selects multiple disks for EC shard placement with diversity +// Uses the consolidated placement package for proper rack/server/disk spreading func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { if len(disks) == 0 { return nil } - // Group disks by rack and DC for diversity - rackGroups := make(map[string][]*topology.DiskInfo) - for _, disk := range disks { - rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) - rackGroups[rackKey] = append(rackGroups[rackKey], disk) + // Convert topology.DiskInfo to placement.DiskCandidate + candidates := diskInfosToCandidates(disks) + if len(candidates) == 0 { + return nil } - var selected []*topology.DiskInfo - usedRacks := make(map[string]bool) + // Configure placement for EC shards + config := placement.PlacementRequest{ + ShardsNeeded: shardsNeeded, + MaxShardsPerServer: 0, // No hard limit, but prefer spreading + MaxShardsPerRack: 0, // No hard limit, but prefer spreading + MaxTaskLoad: topology.MaxTaskLoadForECPlacement, + PreferDifferentServers: true, + PreferDifferentRacks: true, + } - // First pass: select one disk from each rack for maximum diversity - for rackKey, rackDisks := range rackGroups { - if len(selected) >= shardsNeeded { - break - } + // Use the shared placement algorithm + result, err := placement.SelectDestinations(candidates, config) + if err != nil { + glog.V(2).Infof("EC placement failed: %v", err) + return nil + } - // Select best disk from this rack - bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) - if bestDisk != nil { - selected = append(selected, bestDisk) - usedRacks[rackKey] = true + // Convert back to topology.DiskInfo + return candidatesToDiskInfos(result.SelectedDisks, disks) +} + +// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice +func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate { + var candidates []*placement.DiskCandidate + for _, disk := range disks { + if disk.DiskInfo == nil { + continue } - } - // Second pass: if we need more disks, select from racks we've already used - if len(selected) < shardsNeeded { - for _, disk := range disks { - if len(selected) >= shardsNeeded { - break - } + // Calculate free slots (using default max if not set) + freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount) + if freeSlots < 0 { + freeSlots = 0 + } - // Skip if already selected - alreadySelected := false - for _, sel := range selected { - if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { - alreadySelected = true - break + // Calculate EC shard count for this specific disk + // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts + ecShardCount := 0 + if disk.DiskInfo.EcShardInfos != nil { + for _, shardInfo := range disk.DiskInfo.EcShardInfos { + if shardInfo.DiskId == disk.DiskID { + ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount() } } - - if !alreadySelected && isDiskSuitableForEC(disk) { - selected = append(selected, disk) - } } - } - return selected + candidates = append(candidates, &placement.DiskCandidate{ + NodeID: disk.NodeID, + DiskID: disk.DiskID, + DataCenter: disk.DataCenter, + Rack: disk.Rack, + VolumeCount: disk.DiskInfo.VolumeCount, + MaxVolumeCount: disk.DiskInfo.MaxVolumeCount, + ShardCount: ecShardCount, + FreeSlots: freeSlots, + LoadCount: disk.LoadCount, + }) + } + return candidates } -// selectBestFromRack selects the best disk from a rack for EC placement -func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { - if len(disks) == 0 { - return nil +// candidatesToDiskInfos converts placement results back to topology.DiskInfo +func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo { + // Create a map for quick lookup + diskMap := make(map[string]*topology.DiskInfo) + for _, disk := range originalDisks { + key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) + diskMap[key] = disk } - var bestDisk *topology.DiskInfo - bestScore := -1.0 - - for _, disk := range disks { - if !isDiskSuitableForEC(disk) { - continue - } - - score := calculateECScore(disk, sourceRack, sourceDC) - if score > bestScore { - bestScore = score - bestDisk = disk + var result []*topology.DiskInfo + for _, candidate := range candidates { + key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID) + if disk, ok := diskMap[key]; ok { + result = append(result, disk) } } - - return bestDisk + return result } // calculateECScore calculates placement score for EC operations +// Used for logging and plan metadata func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { if disk.DiskInfo == nil { return 0.0 @@ -524,14 +540,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa // Consider current load (secondary factor) score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load - // Note: We don't penalize placing shards on the same rack/DC as source - // since the original volume will be deleted after EC conversion. - // This allows for better network efficiency and storage utilization. - return score } // isDiskSuitableForEC checks if a disk is suitable for EC placement +// Note: This is kept for backward compatibility but the placement package +// handles filtering internally func isDiskSuitableForEC(disk *topology.DiskInfo) bool { if disk.DiskInfo == nil { return false |
