aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_fs_meta_change_volume_id.go
blob: ec7cba72907dce1e4b582ace73487097fcb0c7de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package shell

import (
	"context"
	"flag"
	"fmt"
	"io"
	"os"
	"strconv"
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

func init() {
	Commands = append(Commands, &commandFsMetaChangeVolumeId{})
}

type commandFsMetaChangeVolumeId struct {
}

func (c *commandFsMetaChangeVolumeId) Name() string {
	return "fs.meta.changeVolumeId"
}

func (c *commandFsMetaChangeVolumeId) Help() string {
	return `change volume id in existing metadata.

	fs.meta.changeVolumeId -dir=/path/to/a/dir -fromVolumeId=x -toVolumeId=y -apply
	fs.meta.changeVolumeId -dir=/path/to/a/dir -mapping=/path/to/mapping/file -apply

	The mapping file should have these lines, each line is: [fromVolumeId]=>[toVolumeId]
	e.g.
		1 => 2
		3 => 4

`
}

func (c *commandFsMetaChangeVolumeId) HasTag(CommandTag) bool {
	return false
}

func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {

	fsMetaChangeVolumeIdCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
	dir := fsMetaChangeVolumeIdCommand.String("dir", "/", "fix all metadata under this folder")
	mappingFileName := fsMetaChangeVolumeIdCommand.String("mapping", "", "a file with multiple volume id changes, with each line as x=>y")
	fromVolumeId := fsMetaChangeVolumeIdCommand.Uint("fromVolumeId", 0, "change metadata with this volume id")
	toVolumeId := fsMetaChangeVolumeIdCommand.Uint("toVolumeId", 0, "change metadata to this volume id")
	applyChanges := fsMetaChangeVolumeIdCommand.Bool("apply", false, "apply the metadata changes")
	// TODO: remove this alias
	applyChangesAlias := fsMetaChangeVolumeIdCommand.Bool("force", false, "apply the metadata changes (alias for -apply)")
	if err = fsMetaChangeVolumeIdCommand.Parse(args); err != nil {
		return err
	}

	handleDeprecatedForceFlag(writer, fsMetaChangeVolumeIdCommand, applyChangesAlias, applyChanges)
	infoAboutSimulationMode(writer, *applyChanges, "-apply")

	// load the mapping
	mapping := make(map[needle.VolumeId]needle.VolumeId)
	if *mappingFileName != "" {
		readMappingFromFile(*mappingFileName, mapping)
	} else {
		if *fromVolumeId == *toVolumeId {
			return fmt.Errorf("no volume id changes")
		}
		if *fromVolumeId == 0 || *toVolumeId == 0 {
			return fmt.Errorf("volume id can not be zero")
		}
		mapping[needle.VolumeId(*fromVolumeId)] = needle.VolumeId(*toVolumeId)
	}

	return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
		return filer_pb.TraverseBfs(commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
			if !entry.IsDirectory {
				var hasChanges bool
				for _, chunk := range entry.Chunks {
					if chunk.IsChunkManifest {
						fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
						return
					}
					chunkVolumeId := chunk.Fid.VolumeId
					if toVolumeId, found := mapping[needle.VolumeId(chunkVolumeId)]; found {
						hasChanges = true
						chunk.Fid.VolumeId = uint32(toVolumeId)
						chunk.FileId = ""
					}
				}
				if hasChanges {
					println("Updating", parentPath, entry.Name)
					if *applyChanges {
						if updateErr := filer_pb.UpdateEntry(context.Background(), client, &filer_pb.UpdateEntryRequest{
							Directory: string(parentPath),
							Entry:     entry,
						}); updateErr != nil {
							fmt.Printf("failed to update %s/%s: %v\n", parentPath, entry.Name, updateErr)
						}
					}
				}
			}
		})
	})
}

func readMappingFromFile(filename string, mapping map[needle.VolumeId]needle.VolumeId) error {
	mappingFile, openErr := os.Open(filename)
	if openErr != nil {
		return fmt.Errorf("failed to open file %s: %v", filename, openErr)
	}
	defer mappingFile.Close()
	mappingContent, readErr := io.ReadAll(mappingFile)
	if readErr != nil {
		return fmt.Errorf("failed to read file %s: %v", filename, readErr)
	}
	for _, line := range strings.Split(string(mappingContent), "\n") {
		parts := strings.Split(line, "=>")
		if len(parts) != 2 {
			println("unrecognized line:", line)
			continue
		}
		x, errX := strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64)
		if errX != nil {
			return errX
		}
		y, errY := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64)
		if errY != nil {
			return errY
		}
		mapping[needle.VolumeId(x)] = needle.VolumeId(y)
	}
	return nil
}