aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_fs_merge_volumes.go213
1 files changed, 152 insertions, 61 deletions
diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go
index beac6a0dd..9ed50a6cf 100644
--- a/weed/shell/command_fs_merge_volumes.go
+++ b/weed/shell/command_fs_merge_volumes.go
@@ -7,10 +7,13 @@ import (
"fmt"
"io"
"net/http"
+ "sort"
"strings"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ "golang.org/x/exp/maps"
+ "golang.org/x/exp/slices"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -28,7 +31,8 @@ func init() {
}
type commandFsMergeVolumes struct {
- volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
+ volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
+ volumeSizeLimit uint64
}
func (c *commandFsMergeVolumes) Name() string {
@@ -40,7 +44,7 @@ func (c *commandFsMergeVolumes) Help() string {
This would help clear half-full volumes and let vacuum system to delete them later.
- fs.mergeVolumes -toVolumeId=y [-fromVolumeId=x] [-apply] /dir/
+ fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-apply] [/dir/]
`
}
@@ -50,10 +54,13 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
if err != nil {
return err
}
- dir = strings.TrimRight(dir, "/")
+ if dir != "/" {
+ dir = strings.TrimRight(dir, "/")
+ }
fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
+ collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
if err = fsMergeVolumesCommand.Parse(args); err != nil {
return err
@@ -61,21 +68,9 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
fromVolumeId := needle.VolumeId(*fromVolumeArg)
toVolumeId := needle.VolumeId(*toVolumeArg)
- if toVolumeId == 0 {
- return fmt.Errorf("volume id can not be zero")
- }
-
c.reloadVolumesInfo(commandEnv.MasterClient)
- toVolumeInfo, err := c.getVolumeInfoById(toVolumeId)
- if err != nil {
- return err
- }
- if toVolumeInfo.ReadOnly {
- return fmt.Errorf("volume is readonly: %d", toVolumeId)
- }
-
- if fromVolumeId != 0 {
+ if fromVolumeId != 0 && toVolumeId != 0 {
if fromVolumeId == toVolumeId {
return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
}
@@ -86,57 +81,62 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
if !compatible {
return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
}
+ fromSize := c.getVolumeSizeById(fromVolumeId)
+ toSize := c.getVolumeSizeById(toVolumeId)
+ if fromSize+toSize > c.volumeSizeLimit {
+ return fmt.Errorf(
+ "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
+ fromVolumeId, fromSize/1024/1024,
+ toVolumeId, toSize/1024/1024,
+ c.volumeSizeLimit/1024/102,
+ )
+ }
+ }
+
+ plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
+
+ if err != nil {
+ return err
+ }
+ c.printPlan(plan)
+
+ if len(plan) == 0 {
+ return nil
}
- defer client.CloseIdleConnections()
- compatibility := make(map[string]bool)
+ defer client.CloseIdleConnections()
return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
- if !entry.IsDirectory {
- for _, chunk := range entry.Chunks {
- if chunk.IsChunkManifest {
- fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
- continue
- }
- chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
- if chunkVolumeId == toVolumeId || (fromVolumeId != 0 && fromVolumeId != chunkVolumeId) {
- continue
- }
- cacheKey := fmt.Sprintf("%d-%d", chunkVolumeId, toVolumeId)
- compatible, cached := compatibility[cacheKey]
- if !cached {
- compatible, err = c.volumesAreCompatible(chunkVolumeId, toVolumeId)
- if err != nil {
- _ = fmt.Errorf("cannot determine volumes are compatible: %d and %d", chunkVolumeId, toVolumeId)
- return
- }
- compatibility[cacheKey] = compatible
- }
- if !compatible {
- if fromVolumeId != 0 {
- _ = fmt.Errorf("volumes are incompatible: %d and %d", fromVolumeId, toVolumeId)
- return
- }
- continue
- }
- path := parentPath.Child(entry.Name)
+ if entry.IsDirectory {
+ return
+ }
+ for _, chunk := range entry.Chunks {
+ if chunk.IsChunkManifest {
+ fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
+ continue
+ }
+ chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
+ toVolumeId, found := plan[chunkVolumeId]
+ if !found {
+ continue
+ }
+ path := parentPath.Child(entry.Name)
- fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
- if !*apply {
- continue
- }
- if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
- fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
- continue
- }
+ fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
+ if !*apply {
+ continue
+ }
+ if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
+ fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
+ continue
+ }
- if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
- Directory: string(parentPath),
- Entry: entry,
- }); err != nil {
- fmt.Printf("failed to update %s: %v\n", path, err)
- }
+ if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
+ Directory: string(parentPath),
+ Entry: entry,
+ }); err != nil {
+ fmt.Printf("failed to update %s: %v\n", path, err)
}
}
})
@@ -174,6 +174,9 @@ func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterC
if err != nil {
return err
}
+
+ c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
+
for _, dc := range volumes.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
@@ -192,6 +195,94 @@ func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterC
})
}
+func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
+ plan := make(map[needle.VolumeId]needle.VolumeId)
+ volumes := maps.Keys(c.volumes)
+ sort.Slice(volumes, func(a, b int) bool {
+ return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
+ })
+
+ l := len(volumes)
+ for i := 0; i < l; i++ {
+ volume := c.volumes[volumes[i]]
+ if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
+ volumes = slices.Delete(volumes, i, i+1)
+ i--
+ l--
+ }
+ }
+ for i := l - 1; i >= 0; i-- {
+ src := volumes[i]
+ if fromVolumeId != 0 && src != fromVolumeId {
+ continue
+ }
+ for j := 0; j < i; j++ {
+ condidate := volumes[j]
+ if toVolumeId != 0 && condidate != toVolumeId {
+ continue
+ }
+ if _, moving := plan[condidate]; moving {
+ continue
+ }
+ compatible, err := c.volumesAreCompatible(src, condidate)
+ if err != nil {
+ return nil, err
+ }
+ if !compatible {
+ continue
+ }
+ if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
+ continue
+ }
+ plan[src] = condidate
+ break
+ }
+ }
+
+ return plan, nil
+}
+
+func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
+ size := c.getVolumeSizeById(vid)
+ for src, dist := range plan {
+ if dist == vid {
+ size += c.getVolumeSizeById(src)
+ }
+ }
+ return size
+}
+
+func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
+ return volume.Size - volume.DeletedByteCount
+}
+
+func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
+ return c.getVolumeSize(c.volumes[vid])
+}
+
+func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
+ fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
+ reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
+ for src, dist := range plan {
+ reversePlan[dist] = append(reversePlan[dist], src)
+ }
+ for dist, srcs := range reversePlan {
+ currentSize := c.getVolumeSizeById(dist)
+ for _, src := range srcs {
+ srcSize := c.getVolumeSizeById(src)
+ newSize := currentSize + srcSize
+ fmt.Printf(
+ "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
+ src, srcSize/1024/1024,
+ dist, currentSize/1024/1024, newSize/1024/1024,
+ )
+ currentSize = newSize
+
+ }
+ fmt.Println()
+ }
+}
+
func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
@@ -201,7 +292,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
return err
}
- downloadURL := fmt.Sprintf("http://%s/%s", downloadURLs[0], fromFid.String())
+ downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
if err != nil {