aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_bucket_create.go83
-rw-r--r--weed/shell/command_bucket_delete.go54
-rw-r--r--weed/shell/command_bucket_list.go78
-rw-r--r--weed/shell/command_collection_delete.go50
-rw-r--r--weed/shell/command_collection_list.go58
-rw-r--r--weed/shell/command_ec_balance.go519
-rw-r--r--weed/shell/command_ec_common.go337
-rw-r--r--weed/shell/command_ec_decode.go268
-rw-r--r--weed/shell/command_ec_encode.go298
-rw-r--r--weed/shell/command_ec_rebuild.go271
-rw-r--r--weed/shell/command_ec_test.go139
-rw-r--r--weed/shell/command_fs_cat.go59
-rw-r--r--weed/shell/command_fs_cd.go50
-rw-r--r--weed/shell/command_fs_du.go84
-rw-r--r--weed/shell/command_fs_lock_unlock.go54
-rw-r--r--weed/shell/command_fs_ls.go111
-rw-r--r--weed/shell/command_fs_meta_cat.go68
-rw-r--r--weed/shell/command_fs_meta_load.go100
-rw-r--r--weed/shell/command_fs_meta_notify.go73
-rw-r--r--weed/shell/command_fs_meta_save.go143
-rw-r--r--weed/shell/command_fs_mv.go90
-rw-r--r--weed/shell/command_fs_pwd.go28
-rw-r--r--weed/shell/command_fs_tree.go113
-rw-r--r--weed/shell/command_volume_balance.go257
-rw-r--r--weed/shell/command_volume_configure_replication.go108
-rw-r--r--weed/shell/command_volume_copy.go55
-rw-r--r--weed/shell/command_volume_delete.go50
-rw-r--r--weed/shell/command_volume_fix_replication.go307
-rw-r--r--weed/shell/command_volume_fix_replication_test.go207
-rw-r--r--weed/shell/command_volume_fsck.go361
-rw-r--r--weed/shell/command_volume_list.go133
-rw-r--r--weed/shell/command_volume_mount.go63
-rw-r--r--weed/shell/command_volume_move.go129
-rw-r--r--weed/shell/command_volume_tier_download.go170
-rw-r--r--weed/shell/command_volume_tier_upload.go151
-rw-r--r--weed/shell/command_volume_unmount.go63
-rw-r--r--weed/shell/commands.go137
-rw-r--r--weed/shell/shell_liner.go154
38 files changed, 5473 insertions, 0 deletions
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go
new file mode 100644
index 000000000..52d96e4c3
--- /dev/null
+++ b/weed/shell/command_bucket_create.go
@@ -0,0 +1,83 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketCreate{})
+}
+
+type commandBucketCreate struct {
+}
+
+func (c *commandBucketCreate) Name() string {
+ return "bucket.create"
+}
+
+func (c *commandBucketCreate) Help() string {
+ return `create a bucket with a given name
+
+ Example:
+ bucket.create -name <bucket_name> -replication 001
+`
+}
+
+func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ bucketName := bucketCommand.String("name", "", "bucket name")
+ replication := bucketCommand.String("replication", "", "replication setting for the bucket")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *bucketName == "" {
+ return fmt.Errorf("empty bucket name")
+ }
+
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer configuration: %v", err)
+ }
+ filerBucketsPath := resp.DirBuckets
+
+ println("create bucket under", filerBucketsPath)
+
+ entry := &filer_pb.Entry{
+ Name: *bucketName,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0777 | os.ModeDir),
+ Collection: *bucketName,
+ Replication: *replication,
+ },
+ }
+
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: filerBucketsPath,
+ Entry: entry,
+ }); err != nil {
+ return err
+ }
+
+ println("created bucket", *bucketName)
+
+ return nil
+
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go
new file mode 100644
index 000000000..8f5f63b46
--- /dev/null
+++ b/weed/shell/command_bucket_delete.go
@@ -0,0 +1,54 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketDelete{})
+}
+
+type commandBucketDelete struct {
+}
+
+func (c *commandBucketDelete) Name() string {
+ return "bucket.delete"
+}
+
+func (c *commandBucketDelete) Help() string {
+ return `delete a bucket by a given name
+
+ bucket.delete -name <bucket_name>
+`
+}
+
+func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ bucketName := bucketCommand.String("name", "", "bucket name")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *bucketName == "" {
+ return fmt.Errorf("empty bucket name")
+ }
+
+ _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ var filerBucketsPath string
+ filerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read buckets: %v", err)
+ }
+
+ return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false)
+
+}
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go
new file mode 100644
index 000000000..2e446b6b2
--- /dev/null
+++ b/weed/shell/command_bucket_list.go
@@ -0,0 +1,78 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketList{})
+}
+
+type commandBucketList struct {
+}
+
+func (c *commandBucketList) Name() string {
+ return "bucket.list"
+}
+
+func (c *commandBucketList) Help() string {
+ return `list all buckets
+
+`
+}
+
+func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ var filerBucketsPath string
+ filerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read buckets: %v", err)
+ }
+
+ err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" {
+ fmt.Fprintf(writer, " %s\n", entry.Name)
+ } else {
+ fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication)
+ }
+ return nil
+ }, "", false, math.MaxUint32)
+ if err != nil {
+ return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
+ }
+
+ return err
+
+}
+
+func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer configuration: %v", err)
+ }
+ filerBucketsPath = resp.DirBuckets
+
+ return nil
+
+ })
+
+ return filerBucketsPath, err
+}
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
new file mode 100644
index 000000000..4b3d7f0be
--- /dev/null
+++ b/weed/shell/command_collection_delete.go
@@ -0,0 +1,50 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandCollectionDelete{})
+}
+
+type commandCollectionDelete struct {
+}
+
+func (c *commandCollectionDelete) Name() string {
+ return "collection.delete"
+}
+
+func (c *commandCollectionDelete) Help() string {
+ return `delete specified collection
+
+ collection.delete <collection_name>
+
+`
+}
+
+func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if len(args) == 0 {
+ return nil
+ }
+
+ collectionName := args[0]
+
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: collectionName,
+ })
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ fmt.Fprintf(writer, "collection %s is deleted.\n", collectionName)
+
+ return nil
+}
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
new file mode 100644
index 000000000..2a114e61b
--- /dev/null
+++ b/weed/shell/command_collection_list.go
@@ -0,0 +1,58 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandCollectionList{})
+}
+
+type commandCollectionList struct {
+}
+
+func (c *commandCollectionList) Name() string {
+ return "collection.list"
+}
+
+func (c *commandCollectionList) Help() string {
+ return `list all collections`
+}
+
+func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ collections, err := ListCollectionNames(commandEnv, true, true)
+
+ if err != nil {
+ return err
+ }
+
+ for _, c := range collections {
+ fmt.Fprintf(writer, "collection:\"%s\"\n", c)
+ }
+
+ fmt.Fprintf(writer, "Total %d collections.\n", len(collections))
+
+ return nil
+}
+
+func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
+ var resp *master_pb.CollectionListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
+ IncludeNormalVolumes: includeNormalVolumes,
+ IncludeEcVolumes: includeEcVolumes,
+ })
+ return err
+ })
+ if err != nil {
+ return
+ }
+ for _, c := range resp.Collections {
+ collections = append(collections, c.Name)
+ }
+ return
+}
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
new file mode 100644
index 000000000..1ddb6a490
--- /dev/null
+++ b/weed/shell/command_ec_balance.go
@@ -0,0 +1,519 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandEcBalance{})
+}
+
+type commandEcBalance struct {
+}
+
+func (c *commandEcBalance) Name() string {
+ return "ec.balance"
+}
+
+func (c *commandEcBalance) Help() string {
+ return `balance all ec shards among all racks and volume servers
+
+ ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>]
+
+ Algorithm:
+
+ For each type of volume server (different max volume count limit){
+ for each collection:
+ balanceEcVolumes(collectionName)
+ for each rack:
+ balanceEcRack(rack)
+ }
+
+ func balanceEcVolumes(collectionName){
+ for each volume:
+ doDeduplicateEcShards(volumeId)
+
+ tracks rack~shardCount mapping
+ for each volume:
+ doBalanceEcShardsAcrossRacks(volumeId)
+
+ for each volume:
+ doBalanceEcShardsWithinRacks(volumeId)
+ }
+
+ // spread ec shards into more racks
+ func doBalanceEcShardsAcrossRacks(volumeId){
+ tracks rack~volumeIdShardCount mapping
+ averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
+ ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
+ for each ecShardsToMove {
+ destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack)
+ destVolumeServers = volume servers on the destRack
+ pickOneEcNodeAndMoveOneShard(destVolumeServers)
+ }
+ }
+
+ func doBalanceEcShardsWithinRacks(volumeId){
+ racks = collect all racks that the volume id is on
+ for rack, shards := range racks
+ doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
+ }
+
+ // move ec shards
+ func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
+ tracks volumeServer~volumeIdShardCount mapping
+ averageShardCount = len(shards) / numVolumeServers
+ volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
+ ecShardsToMove = select overflown ec shards from volumeServersOverAverage
+ for each ecShardsToMove {
+ destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount)
+ pickOneEcNodeAndMoveOneShard(destVolumeServers)
+ }
+ }
+
+ // move ec shards while keeping shard distribution for the same volume unchanged or more even
+ func balanceEcRack(rack){
+ averageShardCount = total shards / numVolumeServers
+ for hasMovedOneEcShard {
+ sort all volume servers ordered by the number of local ec shards
+ pick the volume server A with the lowest number of ec shards x
+ pick the volume server B with the highest number of ec shards y
+ if y > averageShardCount and x +1 <= averageShardCount {
+ if B has a ec shard with volume id v that A does not have {
+ move one ec shard v from B to A
+ hasMovedOneEcShard = true
+ }
+ }
+ }
+ }
+
+`
+}
+
+func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
+ dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
+ applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
+ if err = balanceCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ // collect all ec nodes
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
+ if err != nil {
+ return err
+ }
+ if totalFreeEcSlots < 1 {
+ return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
+ }
+
+ racks := collectRacks(allEcNodes)
+
+ if *collection == "EACH_COLLECTION" {
+ collections, err := ListCollectionNames(commandEnv, false, true)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("balanceEcVolumes collections %+v\n", len(collections))
+ for _, c := range collections {
+ fmt.Printf("balanceEcVolumes collection %+v\n", c)
+ if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, *applyBalancing); err != nil {
+ return err
+ }
+ }
+ } else {
+ if err = balanceEcVolumes(commandEnv, *collection, allEcNodes, racks, *applyBalancing); err != nil {
+ return err
+ }
+ }
+
+ if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
+ return fmt.Errorf("balance ec racks: %v", err)
+ }
+
+ return nil
+}
+
+func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
+ // collect racks info
+ racks := make(map[RackId]*EcRack)
+ for _, ecNode := range allEcNodes {
+ if racks[ecNode.rack] == nil {
+ racks[ecNode.rack] = &EcRack{
+ ecNodes: make(map[EcNodeId]*EcNode),
+ }
+ }
+ racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
+ racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
+ }
+ return racks
+}
+
+func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
+
+ fmt.Printf("balanceEcVolumes %s\n", collection)
+
+ if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
+ return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
+ }
+
+ if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
+ }
+
+ if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
+ }
+
+ return nil
+}
+
+func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
+ // vid => []ecNode
+ vidLocations := collectVolumeIdToEcNodes(allEcNodes)
+ // deduplicate ec shards
+ for vid, locations := range vidLocations {
+ if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
+
+ // check whether this volume has ecNodes that are over average
+ shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
+ for _, ecNode := range locations {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ for _, shardId := range shardBits.ShardIds() {
+ shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
+ }
+ }
+ for shardId, ecNodes := range shardToLocations {
+ if len(ecNodes) <= 1 {
+ continue
+ }
+ sortEcNodesByFreeslotsAscending(ecNodes)
+ fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
+ if !applyBalancing {
+ continue
+ }
+
+ duplicatedShardIds := []uint32{uint32(shardId)}
+ for _, ecNode := range ecNodes[1:] {
+ if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ return err
+ }
+ if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ return err
+ }
+ ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
+ }
+ }
+ return nil
+}
+
+func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+ // collect vid => []ecNode, since previous steps can change the locations
+ vidLocations := collectVolumeIdToEcNodes(allEcNodes)
+ // spread the ec shards evenly
+ for vid, locations := range vidLocations {
+ if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
+
+ // calculate average number of shards an ec rack should have for one volume
+ averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
+
+ // see the volume's shards are in how many racks, and how many in each rack
+ rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ return string(ecNode.rack), shardBits.ShardIdCount()
+ })
+ rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
+ return string(ecNode.rack)
+ })
+
+ // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
+ ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
+ for rackId, count := range rackToShardCount {
+ if count > averageShardsPerEcRack {
+ possibleEcNodes := rackEcNodesWithVid[rackId]
+ for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
+ ecShardsToMove[shardId] = ecNode
+ }
+ }
+ }
+
+ for shardId, ecNode := range ecShardsToMove {
+ rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
+ if rackId == "" {
+ fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
+ continue
+ }
+ var possibleDestinationEcNodes []*EcNode
+ for _, n := range racks[rackId].ecNodes {
+ possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
+ }
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ if err != nil {
+ return err
+ }
+ rackToShardCount[string(rackId)] += 1
+ rackToShardCount[string(ecNode.rack)] -= 1
+ racks[rackId].freeEcSlot -= 1
+ racks[ecNode.rack].freeEcSlot += 1
+ }
+
+ return nil
+}
+
+func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
+
+ // TODO later may need to add some randomness
+
+ for rackId, rack := range rackToEcNodes {
+ if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
+ continue
+ }
+
+ if rack.freeEcSlot <= 0 {
+ continue
+ }
+
+ return rackId
+ }
+
+ return ""
+}
+
+func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+ // collect vid => []ecNode, since previous steps can change the locations
+ vidLocations := collectVolumeIdToEcNodes(allEcNodes)
+
+ // spread the ec shards evenly
+ for vid, locations := range vidLocations {
+
+ // see the volume's shards are in how many racks, and how many in each rack
+ rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ return string(ecNode.rack), shardBits.ShardIdCount()
+ })
+ rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
+ return string(ecNode.rack)
+ })
+
+ for rackId, _ := range rackToShardCount {
+
+ var possibleDestinationEcNodes []*EcNode
+ for _, n := range racks[RackId(rackId)].ecNodes {
+ possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
+ }
+ sourceEcNodes := rackEcNodesWithVid[rackId]
+ averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
+ if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+
+ for _, ecNode := range existingLocations {
+
+ shardBits := findEcVolumeShards(ecNode, vid)
+ overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
+
+ for _, shardId := range shardBits.ShardIds() {
+
+ if overLimitCount <= 0 {
+ break
+ }
+
+ fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
+
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ if err != nil {
+ return err
+ }
+
+ overLimitCount--
+ }
+ }
+
+ return nil
+}
+
+func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
+
+ // balance one rack for all ec shards
+ for _, ecRack := range racks {
+ if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
+
+ if len(ecRack.ecNodes) <= 1 {
+ return nil
+ }
+
+ var rackEcNodes []*EcNode
+ for _, node := range ecRack.ecNodes {
+ rackEcNodes = append(rackEcNodes, node)
+ }
+
+ ecNodeIdToShardCount := groupByCount(rackEcNodes, func(node *EcNode) (id string, count int) {
+ for _, ecShardInfo := range node.info.EcShardInfos {
+ count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
+ }
+ return node.info.Id, count
+ })
+
+ var totalShardCount int
+ for _, count := range ecNodeIdToShardCount {
+ totalShardCount += count
+ }
+
+ averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
+
+ hasMove := true
+ for hasMove {
+ hasMove = false
+ sort.Slice(rackEcNodes, func(i, j int) bool {
+ return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot
+ })
+ emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
+ emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
+ if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
+
+ emptyNodeIds := make(map[uint32]bool)
+ for _, shards := range emptyNode.info.EcShardInfos {
+ emptyNodeIds[shards.Id] = true
+ }
+ for _, shards := range fullNode.info.EcShardInfos {
+ if _, found := emptyNodeIds[shards.Id]; !found {
+ for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+
+ err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
+ if err != nil {
+ return err
+ }
+
+ ecNodeIdToShardCount[emptyNode.info.Id]++
+ ecNodeIdToShardCount[fullNode.info.Id]--
+ hasMove = true
+ break
+ }
+ break
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+
+ sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes)
+
+ for _, destEcNode := range possibleDestinationEcNodes {
+ if destEcNode.info.Id == existingLocation.info.Id {
+ continue
+ }
+
+ if destEcNode.freeEcSlot <= 0 {
+ continue
+ }
+ if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
+ continue
+ }
+
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
+
+ err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }
+
+ return nil
+}
+
+func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
+ picked := make(map[erasure_coding.ShardId]*EcNode)
+ var candidateEcNodes []*CandidateEcNode
+ for _, ecNode := range ecNodes {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ if shardBits.ShardIdCount() > 0 {
+ candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
+ ecNode: ecNode,
+ shardCount: shardBits.ShardIdCount(),
+ })
+ }
+ }
+ sort.Slice(candidateEcNodes, func(i, j int) bool {
+ return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
+ })
+ for i := 0; i < n; i++ {
+ selectedEcNodeIndex := -1
+ for i, candidateEcNode := range candidateEcNodes {
+ shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
+ if shardBits > 0 {
+ selectedEcNodeIndex = i
+ for _, shardId := range shardBits.ShardIds() {
+ candidateEcNode.shardCount--
+ picked[shardId] = candidateEcNode.ecNode
+ candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
+ break
+ }
+ break
+ }
+ }
+ if selectedEcNodeIndex >= 0 {
+ ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
+ return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
+ })
+ }
+
+ }
+ return picked
+}
+
+func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode {
+ vidLocations := make(map[needle.VolumeId][]*EcNode)
+ for _, ecNode := range allEcNodes {
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
+ }
+ }
+ return vidLocations
+}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
new file mode 100644
index 000000000..0db119d3c
--- /dev/null
+++ b/weed/shell/command_ec_common.go
@@ -0,0 +1,337 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+
+ copiedShardIds := []uint32{uint32(shardId)}
+
+ if applyBalancing {
+
+ // ask destination node to copy shard and the ecx file from source node, and mount it
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
+ if err != nil {
+ return err
+ }
+
+ // unmount the to be deleted shards
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ // ask source node to delete the shard, and maybe the ecx file
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+
+ }
+
+ destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
+ existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
+
+ return nil
+
+}
+
+func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
+ targetServer *EcNode, shardIdsToCopy []uint32,
+ volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
+
+ fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
+
+ err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+
+ if targetServer.info.Id != existingLocation {
+
+ fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: shardIdsToCopy,
+ CopyEcxFile: true,
+ CopyEcjFile: true,
+ CopyVifFile: true,
+ SourceDataNode: existingLocation,
+ })
+ if copyErr != nil {
+ return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
+ }
+ }
+
+ fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: shardIdsToCopy,
+ })
+ if mountErr != nil {
+ return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
+ }
+
+ if targetServer.info.Id != existingLocation {
+ copiedShardIds = shardIdsToCopy
+ glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
+ for _, dc := range topo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, dn := range rack.DataNodeInfos {
+ fn(dc.Id, RackId(rack.Id), dn)
+ }
+ }
+ }
+}
+
+func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
+ sort.Slice(ecNodes, func(i, j int) bool {
+ return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
+ })
+}
+
+func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
+ sort.Slice(ecNodes, func(i, j int) bool {
+ return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
+ })
+}
+
+type CandidateEcNode struct {
+ ecNode *EcNode
+ shardCount int
+}
+
+// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
+func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
+ for i := index - 1; i >= 0; i-- {
+ if lessThan(i+1, i) {
+ swap(data, i, i+1)
+ } else {
+ break
+ }
+ }
+ for i := index + 1; i < len(data); i++ {
+ if lessThan(i, i-1) {
+ swap(data, i, i-1)
+ } else {
+ break
+ }
+ }
+}
+
+func swap(data []*CandidateEcNode, i, j int) {
+ t := data[i]
+ data[i] = data[j]
+ data[j] = t
+}
+
+func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
+ for _, ecShardInfo := range ecShardInfos {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ count += shardBits.ShardIdCount()
+ }
+ return
+}
+
+func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
+ return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos)
+}
+
+type RackId string
+type EcNodeId string
+
+type EcNode struct {
+ info *master_pb.DataNodeInfo
+ dc string
+ rack RackId
+ freeEcSlot int
+}
+
+type EcRack struct {
+ ecNodes map[EcNodeId]*EcNode
+ freeEcSlot int
+}
+
+func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+
+ // list all possible locations
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return nil, 0, err
+ }
+
+ // find out all volume servers with one slot left.
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ if selectedDataCenter != "" && selectedDataCenter != dc {
+ return
+ }
+
+ freeEcSlots := countFreeShardSlots(dn)
+ ecNodes = append(ecNodes, &EcNode{
+ info: dn,
+ dc: dc,
+ rack: rack,
+ freeEcSlot: int(freeEcSlots),
+ })
+ totalFreeEcSlots += freeEcSlots
+ })
+
+ sortEcNodesByFreeslotsDecending(ecNodes)
+
+ return
+}
+
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+
+ fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
+
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: toBeDeletedShardIds,
+ })
+ return deleteErr
+ })
+
+}
+
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+
+ fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
+
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
+ VolumeId: uint32(volumeId),
+ ShardIds: toBeUnmountedhardIds,
+ })
+ return deleteErr
+ })
+}
+
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+
+ fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
+
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: toBeMountedhardIds,
+ })
+ return mountErr
+ })
+}
+
+func ceilDivide(total, n int) int {
+ return int(math.Ceil(float64(total) / float64(n)))
+}
+
+func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ }
+ }
+
+ return 0
+}
+
+func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
+
+ foundVolume := false
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ foundVolume = true
+ break
+ }
+ }
+
+ if !foundVolume {
+ var newShardBits erasure_coding.ShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
+ Id: uint32(vid),
+ Collection: collection,
+ EcIndexBits: uint32(newShardBits),
+ })
+ ecNode.freeEcSlot -= len(shardIds)
+ }
+
+ return ecNode
+}
+
+func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ }
+ }
+
+ return ecNode
+}
+
+func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
+ countMap := make(map[string]int)
+ for _, d := range data {
+ id, count := identifierFn(d)
+ countMap[id] += count
+ }
+ return countMap
+}
+
+func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
+ groupMap := make(map[string][]*EcNode)
+ for _, d := range data {
+ id := identifierFn(d)
+ groupMap[id] = append(groupMap[id], d)
+ }
+ return groupMap
+}
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
new file mode 100644
index 000000000..5f03df58c
--- /dev/null
+++ b/weed/shell/command_ec_decode.go
@@ -0,0 +1,268 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandEcDecode{})
+}
+
+type commandEcDecode struct {
+}
+
+func (c *commandEcDecode) Name() string {
+ return "ec.decode"
+}
+
+func (c *commandEcDecode) Help() string {
+ return `decode a erasure coded volume into a normal volume
+
+ ec.decode [-collection=""] [-volumeId=<volume_id>]
+
+`
+}
+
+func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
+ collection := encodeCommand.String("collection", "", "the collection name")
+ if err = encodeCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // collect topology information
+ topologyInfo, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ // volumeId is provided
+ if vid != 0 {
+ return doEcDecode(commandEnv, topologyInfo, *collection, vid)
+ }
+
+ // apply to all volumes in the collection
+ volumeIds := collectEcShardIds(topologyInfo, *collection)
+ fmt.Printf("ec encode volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
+ // find volume location
+ nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
+
+ fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
+
+ // collect ec shards to the server with most space
+ targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
+ if err != nil {
+ return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
+ }
+
+ // generate a normal volume
+ err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
+ if err != nil {
+ return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
+ }
+
+ // delete the previous ec shards
+ err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
+ if err != nil {
+ return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
+ }
+
+ return nil
+}
+
+func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
+
+ // mount volume
+ if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
+ VolumeId: uint32(vid),
+ })
+ return mountErr
+ }); err != nil {
+ return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
+ }
+
+ // unmount ec shards
+ for location, ecIndexBits := range nodeToEcIndexBits {
+ fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
+ if err != nil {
+ return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
+ }
+ }
+ // delete ec shards
+ for location, ecIndexBits := range nodeToEcIndexBits {
+ fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
+ if err != nil {
+ return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
+ }
+ }
+
+ return nil
+}
+
+func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
+
+ fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
+
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
+ VolumeId: uint32(vid),
+ Collection: collection,
+ })
+ return genErr
+ })
+
+ return err
+
+}
+
+func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
+
+ maxShardCount := 0
+ var exisitngEcIndexBits erasure_coding.ShardBits
+ for loc, ecIndexBits := range nodeToEcIndexBits {
+ toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
+ if toBeCopiedShardCount > maxShardCount {
+ maxShardCount = toBeCopiedShardCount
+ targetNodeLocation = loc
+ exisitngEcIndexBits = ecIndexBits
+ }
+ }
+
+ fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
+
+ var copiedEcIndexBits erasure_coding.ShardBits
+ for loc, ecIndexBits := range nodeToEcIndexBits {
+ if loc == targetNodeLocation {
+ continue
+ }
+
+ needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
+ if needToCopyEcIndexBits.ShardIdCount() == 0 {
+ continue
+ }
+
+ err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+
+ fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
+
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(vid),
+ Collection: collection,
+ ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
+ CopyEcxFile: false,
+ CopyEcjFile: true,
+ CopyVifFile: true,
+ SourceDataNode: loc,
+ })
+ if copyErr != nil {
+ return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ break
+ }
+
+ copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
+
+ }
+
+ nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
+
+ return targetNodeLocation, err
+
+}
+
+func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ return resp.TopologyInfo, nil
+
+}
+
+func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
+
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.EcShardInfos {
+ if v.Collection == selectedCollection && v.Id == uint32(vid) {
+ ecShardInfos = append(ecShardInfos, v)
+ }
+ }
+ })
+
+ return
+}
+
+func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
+
+ vidMap := make(map[uint32]bool)
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.EcShardInfos {
+ if v.Collection == selectedCollection {
+ vidMap[v.Id] = true
+ }
+ }
+ })
+
+ for vid := range vidMap {
+ vids = append(vids, needle.VolumeId(vid))
+ }
+
+ return
+}
+
+func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
+
+ nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.EcShardInfos {
+ if v.Id == uint32(vid) {
+ nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
+ }
+ }
+ })
+
+ return nodeToEcIndexBits
+}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
new file mode 100644
index 000000000..5a8146954
--- /dev/null
+++ b/weed/shell/command_ec_encode.go
@@ -0,0 +1,298 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+func init() {
+ Commands = append(Commands, &commandEcEncode{})
+}
+
+type commandEcEncode struct {
+}
+
+func (c *commandEcEncode) Name() string {
+ return "ec.encode"
+}
+
+func (c *commandEcEncode) Help() string {
+ return `apply erasure coding to a volume
+
+ ec.encode [-collection=""] [-fullPercent=95] [-quietFor=1h]
+ ec.encode [-collection=""] [-volumeId=<volume_id>]
+
+ This command will:
+ 1. freeze one volume
+ 2. apply erasure coding to the volume
+ 3. move the encoded shards to multiple volume servers
+
+ The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
+ to lose 4 volume servers.
+
+ If the number of volumes are not high, the worst case is that you only have 4 volume servers,
+ and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
+
+ If you only have less than 4 volume servers, with erasure coding, at least you can afford to
+ have 4 corrupted shard files.
+
+`
+}
+
+func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
+ collection := encodeCommand.String("collection", "", "the collection name")
+ fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
+ quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
+ if err = encodeCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // volumeId is provided
+ if vid != 0 {
+ return doEcEncode(commandEnv, *collection, vid)
+ }
+
+ // apply to all volumes in the collection
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("ec encode volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doEcEncode(commandEnv, *collection, vid); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
+ // find volume location
+ locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ return fmt.Errorf("volume %d not found", vid)
+ }
+
+ // fmt.Printf("found ec %d shards on %v\n", vid, locations)
+
+ // mark the volume as readonly
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ if err != nil {
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
+ }
+
+ // generate ec shards
+ err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
+ if err != nil {
+ return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
+ }
+
+ // balance the ec shards to current cluster
+ err = spreadEcShards(commandEnv, vid, collection, locations)
+ if err != nil {
+ return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
+ }
+
+ return nil
+}
+
+func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
+
+ for _, location := range locations {
+
+ fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
+
+ err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return markErr
+ })
+
+ if err != nil {
+ return err
+ }
+
+ }
+
+ return nil
+}
+
+func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+
+ fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
+
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ })
+ return genErr
+ })
+
+ return err
+
+}
+
+func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
+
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
+ if err != nil {
+ return err
+ }
+
+ if totalFreeEcSlots < erasure_coding.TotalShardsCount {
+ return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
+ }
+ allocatedDataNodes := allEcNodes
+ if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
+ allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
+ }
+
+ // calculate how many shards to allocate for these servers
+ allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
+
+ // ask the data nodes to copy from the source volume server
+ copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
+ if err != nil {
+ return err
+ }
+
+ // unmount the to be deleted shards
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ // ask the source volume server to clean up copied ec shards
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
+ if err != nil {
+ return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
+ }
+
+ // ask the source volume server to delete the original volume
+ for _, location := range existingLocations {
+ fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
+ err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
+ if err != nil {
+ return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
+ }
+ }
+
+ return err
+
+}
+
+func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
+
+ fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url)
+
+ // parallelize
+ shardIdChan := make(chan []uint32, len(targetServers))
+ var wg sync.WaitGroup
+ for i, server := range targetServers {
+ if len(allocatedEcIds[i]) <= 0 {
+ continue
+ }
+
+ wg.Add(1)
+ go func(server *EcNode, allocatedEcShardIds []uint32) {
+ defer wg.Done()
+ copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
+ allocatedEcShardIds, volumeId, collection, existingLocation.Url)
+ if copyErr != nil {
+ err = copyErr
+ } else {
+ shardIdChan <- copiedShardIds
+ server.addEcVolumeShards(volumeId, collection, copiedShardIds)
+ }
+ }(server, allocatedEcIds[i])
+ }
+ wg.Wait()
+ close(shardIdChan)
+
+ if err != nil {
+ return nil, err
+ }
+
+ for shardIds := range shardIdChan {
+ actuallyCopied = append(actuallyCopied, shardIds...)
+ }
+
+ return
+}
+
+func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
+ allocated = make([][]uint32, len(servers))
+ allocatedShardIdIndex := uint32(0)
+ serverIndex := 0
+ for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
+ if servers[serverIndex].freeEcSlot > 0 {
+ allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
+ allocatedShardIdIndex++
+ }
+ serverIndex++
+ if serverIndex >= len(servers) {
+ serverIndex = 0
+ }
+ }
+
+ return allocated
+}
+
+func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ quietSeconds := int64(quietPeriod / time.Second)
+ nowUnixSeconds := time.Now().Unix()
+
+ fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
+
+ vidMap := make(map[uint32]bool)
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.VolumeInfos {
+ if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
+ if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
+ vidMap[v.Id] = true
+ }
+ }
+ }
+ })
+
+ for vid := range vidMap {
+ vids = append(vids, needle.VolumeId(vid))
+ }
+
+ return
+}
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
new file mode 100644
index 000000000..df28681fe
--- /dev/null
+++ b/weed/shell/command_ec_rebuild.go
@@ -0,0 +1,271 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func init() {
+ Commands = append(Commands, &commandEcRebuild{})
+}
+
+type commandEcRebuild struct {
+}
+
+func (c *commandEcRebuild) Name() string {
+ return "ec.rebuild"
+}
+
+func (c *commandEcRebuild) Help() string {
+ return `find and rebuild missing ec shards among volume servers
+
+ ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-force]
+
+ Algorithm:
+
+ For each type of volume server (different max volume count limit){
+ for each collection {
+ rebuildEcVolumes()
+ }
+ }
+
+ func rebuildEcVolumes(){
+ idealWritableVolumes = totalWritableVolumes / numVolumeServers
+ for {
+ sort all volume servers ordered by the number of local writable volumes
+ pick the volume server A with the lowest number of writable volumes x
+ pick the volume server B with the highest number of writable volumes y
+ if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
+ if B has a writable volume id v that A does not have {
+ move writable volume v from A to B
+ }
+ }
+ }
+ }
+
+`
+}
+
+func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
+ applyChanges := fixCommand.Bool("force", false, "apply the changes")
+ if err = fixCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ // collect all ec nodes
+ allEcNodes, _, err := collectEcNodes(commandEnv, "")
+ if err != nil {
+ return err
+ }
+
+ if *collection == "EACH_COLLECTION" {
+ collections, err := ListCollectionNames(commandEnv, false, true)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("rebuildEcVolumes collections %+v\n", len(collections))
+ for _, c := range collections {
+ fmt.Printf("rebuildEcVolumes collection %+v\n", c)
+ if err = rebuildEcVolumes(commandEnv, allEcNodes, c, writer, *applyChanges); err != nil {
+ return err
+ }
+ }
+ } else {
+ if err = rebuildEcVolumes(commandEnv, allEcNodes, *collection, writer, *applyChanges); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error {
+
+ fmt.Printf("rebuildEcVolumes %s\n", collection)
+
+ // collect vid => each shard locations, similar to ecShardMap in topology.go
+ ecShardMap := make(EcShardMap)
+ for _, ecNode := range allEcNodes {
+ ecShardMap.registerEcNode(ecNode, collection)
+ }
+
+ for vid, locations := range ecShardMap {
+ shardCount := locations.shardCount()
+ if shardCount == erasure_coding.TotalShardsCount {
+ continue
+ }
+ if shardCount < erasure_coding.DataShardsCount {
+ return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount)
+ }
+
+ sortEcNodesByFreeslotsDecending(allEcNodes)
+
+ if allEcNodes[0].freeEcSlot < erasure_coding.TotalShardsCount {
+ return fmt.Errorf("disk space is not enough")
+ }
+
+ if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
+
+ fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
+
+ // collect shard files to rebuilder local disk
+ var generatedShardIds []uint32
+ copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ // clean up working files
+
+ // ask the rebuilder to delete the copied shards
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
+ if err != nil {
+ fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
+ }
+
+ }()
+
+ if !applyChanges {
+ return nil
+ }
+
+ // generate ec shards, and maybe ecx file
+ generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
+ if err != nil {
+ return err
+ }
+
+ // mount the generated shards
+ err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
+ if err != nil {
+ return err
+ }
+
+ rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds)
+
+ return nil
+}
+
+func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
+
+ err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ })
+ if rebultErr == nil {
+ rebuiltShardIds = resp.RebuiltShardIds
+ }
+ return rebultErr
+ })
+ return
+}
+
+func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
+
+ needEcxFile := true
+ var localShardBits erasure_coding.ShardBits
+ for _, ecShardInfo := range rebuilder.info.EcShardInfos {
+ if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
+ needEcxFile = false
+ localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ }
+ }
+
+ for shardId, ecNodes := range locations {
+
+ if len(ecNodes) == 0 {
+ fmt.Fprintf(writer, "missing shard %d.%d\n", volumeId, shardId)
+ continue
+ }
+
+ if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) {
+ localShardIds = append(localShardIds, uint32(shardId))
+ fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId)
+ continue
+ }
+
+ var copyErr error
+ if applyBalancing {
+ copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: []uint32{uint32(shardId)},
+ CopyEcxFile: needEcxFile,
+ CopyEcjFile: needEcxFile,
+ CopyVifFile: needEcxFile,
+ SourceDataNode: ecNodes[0].info.Id,
+ })
+ return copyErr
+ })
+ if copyErr == nil && needEcxFile {
+ needEcxFile = false
+ }
+ }
+ if copyErr != nil {
+ fmt.Fprintf(writer, "%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr)
+ } else {
+ fmt.Fprintf(writer, "%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id)
+ copiedShardIds = append(copiedShardIds, uint32(shardId))
+ }
+
+ }
+
+ if len(copiedShardIds)+len(localShardIds) >= erasure_coding.DataShardsCount {
+ return copiedShardIds, localShardIds, nil
+ }
+
+ return nil, nil, fmt.Errorf("%d shards are not enough to recover volume %d", len(copiedShardIds)+len(localShardIds), volumeId)
+
+}
+
+type EcShardMap map[needle.VolumeId]EcShardLocations
+type EcShardLocations [][]*EcNode
+
+func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) {
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if shardInfo.Collection == collection {
+ existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)]
+ if !found {
+ existing = make([][]*EcNode, erasure_coding.TotalShardsCount)
+ ecShardMap[needle.VolumeId(shardInfo.Id)] = existing
+ }
+ for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() {
+ existing[shardId] = append(existing[shardId], ecNode)
+ }
+ }
+ }
+}
+
+func (ecShardLocations EcShardLocations) shardCount() (count int) {
+ for _, locations := range ecShardLocations {
+ if len(locations) > 0 {
+ count++
+ }
+ }
+ return
+}
diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go
new file mode 100644
index 000000000..4fddcbea5
--- /dev/null
+++ b/weed/shell/command_ec_test.go
@@ -0,0 +1,139 @@
+package shell
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func TestCommandEcDistribution(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn1", 100),
+ newEcNode("dc1", "rack2", "dn2", 100),
+ }
+
+ allocated := balancedEcDistribution(allEcNodes)
+
+ fmt.Printf("allocated: %+v", allocated)
+}
+
+func TestCommandEcBalanceSmall(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
+ newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
+ }
+
+ racks := collectRacks(allEcNodes)
+ balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
+}
+
+func TestCommandEcBalanceNothingToMove(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn1", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}),
+ newEcNode("dc1", "rack1", "dn2", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
+ }
+
+ racks := collectRacks(allEcNodes)
+ balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
+}
+
+func TestCommandEcBalanceAddNewServers(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn1", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}),
+ newEcNode("dc1", "rack1", "dn2", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
+ newEcNode("dc1", "rack1", "dn3", 100),
+ newEcNode("dc1", "rack1", "dn4", 100),
+ }
+
+ racks := collectRacks(allEcNodes)
+ balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
+}
+
+func TestCommandEcBalanceAddNewRacks(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn1", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}),
+ newEcNode("dc1", "rack1", "dn2", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
+ newEcNode("dc1", "rack2", "dn3", 100),
+ newEcNode("dc1", "rack2", "dn4", 100),
+ }
+
+ racks := collectRacks(allEcNodes)
+ balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
+}
+
+func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn_shared", 100).
+ addEcVolumeAndShardsForTest(1, "c1", []uint32{0}).
+ addEcVolumeAndShardsForTest(2, "c1", []uint32{0}),
+
+ newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{1}),
+ newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{2}),
+ newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{3}),
+ newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{4}),
+ newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{5}),
+ newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{6}),
+ newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{7}),
+ newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{8}),
+ newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{9}),
+ newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{10}),
+ newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{11}),
+ newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{12}),
+ newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{13}),
+
+ newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{1}),
+ newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{2}),
+ newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{3}),
+ newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{4}),
+ newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{5}),
+ newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{6}),
+ newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{7}),
+ newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{8}),
+ newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{9}),
+ newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{10}),
+ newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{11}),
+ newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{12}),
+ newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{13}),
+
+ newEcNode("dc1", "rack1", "dn3", 100),
+ }
+
+ racks := collectRacks(allEcNodes)
+ balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
+ balanceEcRacks(nil, racks, false)
+}
+
+func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {
+ return &EcNode{
+ info: &master_pb.DataNodeInfo{
+ Id: dataNodeId,
+ },
+ dc: dc,
+ rack: RackId(rack),
+ freeEcSlot: freeEcSlot,
+ }
+}
+
+func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode {
+ return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds)
+}
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
new file mode 100644
index 000000000..7177d8ac3
--- /dev/null
+++ b/weed/shell/command_fs_cat.go
@@ -0,0 +1,59 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsCat{})
+}
+
+type commandFsCat struct {
+}
+
+func (c *commandFsCat) Name() string {
+ return "fs.cat"
+}
+
+func (c *commandFsCat) Help() string {
+ return `stream the file content on to the screen
+
+ fs.cat /dir/file_name
+`
+}
+
+func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ if commandEnv.isDirectory(path) {
+ return fmt.Errorf("%s is a directory", path)
+ }
+
+ dir, name := util.FullPath(path).DirAndName()
+
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ return err
+ }
+
+ return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+
+ })
+
+}
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
new file mode 100644
index 000000000..2cc28f7a2
--- /dev/null
+++ b/weed/shell/command_fs_cd.go
@@ -0,0 +1,50 @@
+package shell
+
+import (
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsCd{})
+}
+
+type commandFsCd struct {
+}
+
+func (c *commandFsCd) Name() string {
+ return "fs.cd"
+}
+
+func (c *commandFsCd) Help() string {
+ return `change directory to a directory /path/to/dir
+
+ The full path can be too long to type. For example,
+ fs.ls /some/path/to/file_name
+
+ can be simplified as
+
+ fs.cd /some/path
+ fs.ls to/file_name
+`
+}
+
+func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ if path == "/" {
+ commandEnv.option.Directory = "/"
+ return nil
+ }
+
+ err = commandEnv.checkDirectory(path)
+
+ if err == nil {
+ commandEnv.option.Directory = path
+ }
+
+ return err
+}
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
new file mode 100644
index 000000000..96551dd5a
--- /dev/null
+++ b/weed/shell/command_fs_du.go
@@ -0,0 +1,84 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsDu{})
+}
+
+type commandFsDu struct {
+}
+
+func (c *commandFsDu) Name() string {
+ return "fs.du"
+}
+
+func (c *commandFsDu) Help() string {
+ return `show disk usage
+
+ fs.du /dir
+ fs.du /dir/file_name
+ fs.du /dir/file_prefix
+`
+}
+
+func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ if commandEnv.isDirectory(path) {
+ path = path + "/"
+ }
+
+ var blockCount, byteCount uint64
+ dir, name := util.FullPath(path).DirAndName()
+ blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv, dir, name)
+
+ if name == "" && err == nil {
+ fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
+ }
+
+ return
+
+}
+
+func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) {
+
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
+
+ var fileBlockCount, fileByteCount uint64
+
+ if entry.IsDirectory {
+ subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
+ if dir == "/" {
+ subDir = "/" + entry.Name
+ }
+ numBlock, numByte, err := duTraverseDirectory(writer, filerClient, subDir, "")
+ if err == nil {
+ blockCount += numBlock
+ byteCount += numByte
+ }
+ } else {
+ fileBlockCount = uint64(len(entry.Chunks))
+ fileByteCount = filer2.TotalSize(entry.Chunks)
+ blockCount += uint64(len(entry.Chunks))
+ byteCount += filer2.TotalSize(entry.Chunks)
+ }
+
+ if name != "" && !entry.IsDirectory {
+ fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name)
+ }
+ return nil
+ })
+ return
+}
diff --git a/weed/shell/command_fs_lock_unlock.go b/weed/shell/command_fs_lock_unlock.go
new file mode 100644
index 000000000..8a6e8f71b
--- /dev/null
+++ b/weed/shell/command_fs_lock_unlock.go
@@ -0,0 +1,54 @@
+package shell
+
+import (
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandUnlock{})
+ Commands = append(Commands, &commandLock{})
+}
+
+// =========== Lock ==============
+type commandLock struct {
+}
+
+func (c *commandLock) Name() string {
+ return "lock"
+}
+
+func (c *commandLock) Help() string {
+ return `lock in order to exclusively manage the cluster
+
+ This is a blocking operation if there is alread another lock.
+`
+}
+
+func (c *commandLock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ commandEnv.locker.RequestLock()
+
+ return nil
+}
+
+// =========== Unlock ==============
+
+type commandUnlock struct {
+}
+
+func (c *commandUnlock) Name() string {
+ return "unlock"
+}
+
+func (c *commandUnlock) Help() string {
+ return `unlock the cluster-wide lock
+
+`
+}
+
+func (c *commandUnlock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ commandEnv.locker.ReleaseLock()
+
+ return nil
+}
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
new file mode 100644
index 000000000..36133992f
--- /dev/null
+++ b/weed/shell/command_fs_ls.go
@@ -0,0 +1,111 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "os/user"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsLs{})
+}
+
+type commandFsLs struct {
+}
+
+func (c *commandFsLs) Name() string {
+ return "fs.ls"
+}
+
+func (c *commandFsLs) Help() string {
+ return `list all files under a directory
+
+ fs.ls [-l] [-a] /dir/
+ fs.ls [-l] [-a] /dir/file_name
+ fs.ls [-l] [-a] /dir/file_prefix
+`
+}
+
+func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ var isLongFormat, showHidden bool
+ for _, arg := range args {
+ if !strings.HasPrefix(arg, "-") {
+ break
+ }
+ for _, t := range arg {
+ switch t {
+ case 'a':
+ showHidden = true
+ case 'l':
+ isLongFormat = true
+ }
+ }
+ }
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ if commandEnv.isDirectory(path) {
+ path = path + "/"
+ }
+
+ dir, name := util.FullPath(path).DirAndName()
+ entryCount := 0
+
+ err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
+
+ if !showHidden && strings.HasPrefix(entry.Name, ".") {
+ return nil
+ }
+
+ entryCount++
+
+ if isLongFormat {
+ fileMode := os.FileMode(entry.Attributes.FileMode)
+ userName, groupNames := entry.Attributes.UserName, entry.Attributes.GroupName
+ if userName == "" {
+ if user, userErr := user.LookupId(strconv.Itoa(int(entry.Attributes.Uid))); userErr == nil {
+ userName = user.Username
+ }
+ }
+ groupName := ""
+ if len(groupNames) > 0 {
+ groupName = groupNames[0]
+ }
+ if groupName == "" {
+ if group, groupErr := user.LookupGroupId(strconv.Itoa(int(entry.Attributes.Gid))); groupErr == nil {
+ groupName = group.Name
+ }
+ }
+
+ if strings.HasSuffix(dir, "/") {
+ // just for printing
+ dir = dir[:len(dir)-1]
+ }
+ fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n",
+ fileMode, len(entry.Chunks),
+ userName, groupName,
+ filer2.TotalSize(entry.Chunks), dir, entry.Name)
+ } else {
+ fmt.Fprintf(writer, "%s\n", entry.Name)
+ }
+
+ return nil
+ })
+
+ if isLongFormat && err == nil {
+ fmt.Fprintf(writer, "total %d\n", entryCount)
+ }
+
+ return
+}
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
new file mode 100644
index 000000000..0679ec075
--- /dev/null
+++ b/weed/shell/command_fs_meta_cat.go
@@ -0,0 +1,68 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/golang/protobuf/jsonpb"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMetaCat{})
+}
+
+type commandFsMetaCat struct {
+}
+
+func (c *commandFsMetaCat) Name() string {
+ return "fs.meta.cat"
+}
+
+func (c *commandFsMetaCat) Help() string {
+ return `print out the meta data content for a file or directory
+
+ fs.meta.cat /dir/
+ fs.meta.cat /dir/file_name
+`
+}
+
+func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ dir, name := util.FullPath(path).DirAndName()
+
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ return err
+ }
+
+ m := jsonpb.Marshaler{
+ EmitDefaults: true,
+ Indent: " ",
+ }
+
+ text, marshalErr := m.MarshalToString(respLookupEntry.Entry)
+ if marshalErr != nil {
+ return fmt.Errorf("marshal meta: %v", marshalErr)
+ }
+
+ fmt.Fprintf(writer, "%s\n", text)
+
+ return nil
+
+ })
+
+}
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
new file mode 100644
index 000000000..69ae9454c
--- /dev/null
+++ b/weed/shell/command_fs_meta_load.go
@@ -0,0 +1,100 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMetaLoad{})
+}
+
+type commandFsMetaLoad struct {
+}
+
+func (c *commandFsMetaLoad) Name() string {
+ return "fs.meta.load"
+}
+
+func (c *commandFsMetaLoad) Help() string {
+ return `load saved filer meta data to restore the directory and file structure
+
+ fs.meta.load <filer_host>-<port>-<time>.meta
+
+`
+}
+
+func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if len(args) == 0 {
+ fmt.Fprintf(writer, "missing a metadata file\n")
+ return nil
+ }
+
+ fileName := args[len(args)-1]
+
+ dst, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
+ if err != nil {
+ return nil
+ }
+ defer dst.Close()
+
+ var dirCount, fileCount uint64
+
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ sizeBuf := make([]byte, 4)
+
+ for {
+ if n, err := dst.Read(sizeBuf); n != 4 {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+
+ size := util.BytesToUint32(sizeBuf)
+
+ data := make([]byte, int(size))
+
+ if n, err := dst.Read(data); n != len(data) {
+ return err
+ }
+
+ fullEntry := &filer_pb.FullEntry{}
+ if err = proto.Unmarshal(data, fullEntry); err != nil {
+ return err
+ }
+
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: fullEntry.Dir,
+ Entry: fullEntry.Entry,
+ }); err != nil {
+ return err
+ }
+
+ fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
+
+ if fullEntry.Entry.IsDirectory {
+ dirCount++
+ } else {
+ fileCount++
+ }
+
+ }
+
+ })
+
+ if err == nil {
+ fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount)
+ fmt.Fprintf(writer, "\n%s is loaded.\n", fileName)
+ }
+
+ return err
+}
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
new file mode 100644
index 000000000..4342fa81d
--- /dev/null
+++ b/weed/shell/command_fs_meta_notify.go
@@ -0,0 +1,73 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMetaNotify{})
+}
+
+type commandFsMetaNotify struct {
+}
+
+func (c *commandFsMetaNotify) Name() string {
+ return "fs.meta.notify"
+}
+
+func (c *commandFsMetaNotify) Help() string {
+ return `recursively send directory and file meta data to notifiction message queue
+
+ fs.meta.notify # send meta data from current directory to notification message queue
+
+ The message queue will use it to trigger replication from this filer.
+
+`
+}
+
+func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ util.LoadConfiguration("notification", true)
+ v := util.GetViper()
+ notification.LoadConfiguration(v, "notification.")
+
+ var dirCount, fileCount uint64
+
+ err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
+
+ if entry.IsDirectory {
+ dirCount++
+ } else {
+ fileCount++
+ }
+
+ notifyErr := notification.Queue.SendMessage(
+ string(parentPath.Child(entry.Name)),
+ &filer_pb.EventNotification{
+ NewEntry: entry,
+ },
+ )
+
+ if notifyErr != nil {
+ fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr)
+ }
+
+ })
+
+ if err == nil {
+ fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount)
+ }
+
+ return err
+
+}
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
new file mode 100644
index 000000000..ed19e3d01
--- /dev/null
+++ b/weed/shell/command_fs_meta_save.go
@@ -0,0 +1,143 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMetaSave{})
+}
+
+type commandFsMetaSave struct {
+}
+
+func (c *commandFsMetaSave) Name() string {
+ return "fs.meta.save"
+}
+
+func (c *commandFsMetaSave) Help() string {
+ return `save all directory and file meta data to a local file for metadata backup.
+
+ fs.meta.save / # save from the root
+ fs.meta.save -v -o t.meta / # save from the root, output to t.meta file.
+ fs.meta.save /path/to/save # save from the directory /path/to/save
+ fs.meta.save . # save from current directory
+ fs.meta.save # save from current directory
+
+ The meta data will be saved into a local <filer_host>-<port>-<time>.meta file.
+ These meta data can be later loaded by fs.meta.load command,
+
+`
+}
+
+func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ verbose := fsMetaSaveCommand.Bool("v", false, "print out each processed files")
+ outputFileName := fsMetaSaveCommand.String("o", "", "output the meta data to this file")
+ // chunksFileName := fsMetaSaveCommand.String("chunks", "", "output all the chunks to this file")
+ if err = fsMetaSaveCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ fileName := *outputFileName
+ if fileName == "" {
+ t := time.Now()
+ fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
+ commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
+ }
+
+ dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("failed to create file %s: %v", fileName, openErr)
+ }
+ defer dst.Close()
+
+ err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) {
+ sizeBuf := make([]byte, 4)
+ for item := range outputChan {
+ b := item.([]byte)
+ util.Uint32toBytes(sizeBuf, uint32(len(b)))
+ dst.Write(sizeBuf)
+ dst.Write(b)
+ }
+ }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ bytes, err := proto.Marshal(entry)
+ if err != nil {
+ fmt.Fprintf(writer, "marshall error: %v\n", err)
+ return
+ }
+
+ outputChan <- bytes
+ return nil
+ })
+
+ if err == nil {
+ fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName)
+ }
+
+ return err
+
+}
+
+func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error {
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ outputChan := make(chan interface{}, 1024)
+ go func() {
+ saveFn(outputChan)
+ wg.Done()
+ }()
+
+ var dirCount, fileCount uint64
+
+ err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
+
+ protoMessage := &filer_pb.FullEntry{
+ Dir: string(parentPath),
+ Entry: entry,
+ }
+
+ if err := genFn(protoMessage, outputChan); err != nil {
+ fmt.Fprintf(writer, "marshall error: %v\n", err)
+ return
+ }
+
+ if entry.IsDirectory {
+ atomic.AddUint64(&dirCount, 1)
+ } else {
+ atomic.AddUint64(&fileCount, 1)
+ }
+
+ if verbose {
+ println(parentPath.Child(entry.Name))
+ }
+
+ })
+
+ close(outputChan)
+
+ wg.Wait()
+
+ if err == nil && writer != nil {
+ fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)
+ }
+ return err
+}
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
new file mode 100644
index 000000000..0a7eed02d
--- /dev/null
+++ b/weed/shell/command_fs_mv.go
@@ -0,0 +1,90 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMv{})
+}
+
+type commandFsMv struct {
+}
+
+func (c *commandFsMv) Name() string {
+ return "fs.mv"
+}
+
+func (c *commandFsMv) Help() string {
+ return `move or rename a file or a folder
+
+ fs.mv <source entry> <destination entry>
+
+ fs.mv /dir/file_name /dir2/filename2
+ fs.mv /dir/file_name /dir2
+
+ fs.mv /dir/dir2 /dir3/dir4/
+ fs.mv /dir/dir2 /dir3/new_dir
+
+`
+}
+
+func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ sourcePath, err := commandEnv.parseUrl(args[0])
+ if err != nil {
+ return err
+ }
+
+ destinationPath, err := commandEnv.parseUrl(args[1])
+ if err != nil {
+ return err
+ }
+
+ sourceDir, sourceName := util.FullPath(sourcePath).DirAndName()
+
+ destinationDir, destinationName := util.FullPath(destinationPath).DirAndName()
+
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ // collect destination entry info
+ destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
+ Name: destinationDir,
+ Directory: destinationName,
+ }
+ respDestinationLookupEntry, err := filer_pb.LookupEntry(client, destinationRequest)
+
+ var targetDir, targetName string
+
+ // moving a file or folder
+ if err == nil && respDestinationLookupEntry.Entry.IsDirectory {
+ // to a directory
+ targetDir = util.Join(destinationDir, destinationName)
+ targetName = sourceName
+ } else {
+ // to a file or folder
+ targetDir = destinationDir
+ targetName = destinationName
+ }
+
+ request := &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: sourceDir,
+ OldName: sourceName,
+ NewDirectory: targetDir,
+ NewName: targetName,
+ }
+
+ _, err = client.AtomicRenameEntry(context.Background(), request)
+
+ fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, util.NewFullPath(targetDir, targetName))
+
+ return err
+
+ })
+
+}
diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go
new file mode 100644
index 000000000..d7d9819c8
--- /dev/null
+++ b/weed/shell/command_fs_pwd.go
@@ -0,0 +1,28 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsPwd{})
+}
+
+type commandFsPwd struct {
+}
+
+func (c *commandFsPwd) Name() string {
+ return "fs.pwd"
+}
+
+func (c *commandFsPwd) Help() string {
+ return `print out current directory`
+}
+
+func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory)
+
+ return nil
+}
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
new file mode 100644
index 000000000..a8c5b2018
--- /dev/null
+++ b/weed/shell/command_fs_tree.go
@@ -0,0 +1,113 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsTree{})
+}
+
+type commandFsTree struct {
+}
+
+func (c *commandFsTree) Name() string {
+ return "fs.tree"
+}
+
+func (c *commandFsTree) Help() string {
+ return `recursively list all files under a directory
+
+ fs.tree /some/dir
+
+`
+}
+
+func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ dir, name := util.FullPath(path).DirAndName()
+
+ dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv, util.FullPath(dir), name, newPrefix(), -1)
+
+ if terr == nil {
+ fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount)
+ }
+
+ return terr
+
+}
+
+func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir util.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) {
+
+ prefix.addMarker(level)
+
+ err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) error {
+ if level < 0 && name != "" {
+ if entry.Name != name {
+ return nil
+ }
+ }
+
+ fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name)
+
+ if entry.IsDirectory {
+ directoryCount++
+ subDir := dir.Child(entry.Name)
+ dirCount, fCount, terr := treeTraverseDirectory(writer, filerClient, subDir, "", prefix, level+1)
+ directoryCount += dirCount
+ fileCount += fCount
+ err = terr
+ } else {
+ fileCount++
+ }
+ return nil
+ })
+ return
+}
+
+type Prefix struct {
+ markers map[int]bool
+}
+
+func newPrefix() *Prefix {
+ return &Prefix{
+ markers: make(map[int]bool),
+ }
+}
+func (p *Prefix) addMarker(marker int) {
+ p.markers[marker] = true
+}
+func (p *Prefix) removeMarker(marker int) {
+ delete(p.markers, marker)
+}
+func (p *Prefix) getPrefix(level int, isLastChild bool) string {
+ var sb strings.Builder
+ if level < 0 {
+ return ""
+ }
+ for i := 0; i < level; i++ {
+ if _, ok := p.markers[i]; ok {
+ sb.WriteString("│")
+ } else {
+ sb.WriteString(" ")
+ }
+ sb.WriteString(" ")
+ }
+ if isLastChild {
+ sb.WriteString("└──")
+ p.removeMarker(level)
+ } else {
+ sb.WriteString("├──")
+ }
+ return sb.String()
+}
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
new file mode 100644
index 000000000..69e3c7fd9
--- /dev/null
+++ b/weed/shell/command_volume_balance.go
@@ -0,0 +1,257 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "sort"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeBalance{})
+}
+
+type commandVolumeBalance struct {
+}
+
+func (c *commandVolumeBalance) Name() string {
+ return "volume.balance"
+}
+
+func (c *commandVolumeBalance) Help() string {
+ return `balance all volumes among volume servers
+
+ volume.balance [-collection ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>]
+
+ Algorithm:
+
+ For each type of volume server (different max volume count limit){
+ for each collection {
+ balanceWritableVolumes()
+ balanceReadOnlyVolumes()
+ }
+ }
+
+ func balanceWritableVolumes(){
+ idealWritableVolumes = totalWritableVolumes / numVolumeServers
+ for hasMovedOneVolume {
+ sort all volume servers ordered by the number of local writable volumes
+ pick the volume server A with the lowest number of writable volumes x
+ pick the volume server B with the highest number of writable volumes y
+ if y > idealWritableVolumes and x +1 <= idealWritableVolumes {
+ if B has a writable volume id v that A does not have {
+ move writable volume v from A to B
+ }
+ }
+ }
+ }
+ func balanceReadOnlyVolumes(){
+ //similar to balanceWritableVolumes
+ }
+
+`
+}
+
+func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
+ dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
+ applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.")
+ if err = balanceCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
+
+ for maxVolumeCount, volumeServers := range typeToNodes {
+ if len(volumeServers) < 2 {
+ fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount)
+ continue
+ }
+ if *collection == "EACH_COLLECTION" {
+ collections, err := ListCollectionNames(commandEnv, true, false)
+ if err != nil {
+ return err
+ }
+ for _, c := range collections {
+ if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
+ return err
+ }
+ }
+ } else if *collection == "ALL_COLLECTIONS" {
+ if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
+ return err
+ }
+ } else {
+ if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ return err
+ }
+ }
+
+ }
+ return nil
+}
+
+func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
+
+ // balance writable volumes
+ for _, n := range nodes {
+ n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
+ if collection != "ALL_COLLECTIONS" {
+ if v.Collection != collection {
+ return false
+ }
+ }
+ return !v.ReadOnly && v.Size < volumeSizeLimit
+ })
+ }
+ if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil {
+ return err
+ }
+
+ // balance readable volumes
+ for _, n := range nodes {
+ n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
+ if collection != "ALL_COLLECTIONS" {
+ if v.Collection != collection {
+ return false
+ }
+ }
+ return v.ReadOnly || v.Size >= volumeSizeLimit
+ })
+ }
+ if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
+ typeToNodes = make(map[uint64][]*Node)
+ for _, dc := range t.DataCenterInfos {
+ if selectedDataCenter != "" && dc.Id != selectedDataCenter {
+ continue
+ }
+ for _, r := range dc.RackInfos {
+ for _, dn := range r.DataNodeInfos {
+ typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
+ info: dn,
+ dc: dc.Id,
+ rack: r.Id,
+ })
+ }
+ }
+ }
+ return
+}
+
+type Node struct {
+ info *master_pb.DataNodeInfo
+ selectedVolumes map[uint32]*master_pb.VolumeInformationMessage
+ dc string
+ rack string
+}
+
+func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
+ sort.Slice(volumes, func(i, j int) bool {
+ return volumes[i].Size < volumes[j].Size
+ })
+}
+
+func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
+ sort.Slice(volumes, func(i, j int) bool {
+ return volumes[i].Id < volumes[j].Id
+ })
+}
+
+func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error {
+ selectedVolumeCount := 0
+ for _, dn := range nodes {
+ selectedVolumeCount += len(dn.selectedVolumes)
+ }
+
+ idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes))
+
+ hasMove := true
+
+ for hasMove {
+ hasMove = false
+ sort.Slice(nodes, func(i, j int) bool {
+ // TODO sort by free volume slots???
+ return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes)
+ })
+ emptyNode, fullNode := nodes[0], nodes[len(nodes)-1]
+ if len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes {
+
+ // sort the volumes to move
+ var candidateVolumes []*master_pb.VolumeInformationMessage
+ for _, v := range fullNode.selectedVolumes {
+ candidateVolumes = append(candidateVolumes, v)
+ }
+ sortCandidatesFn(candidateVolumes)
+
+ for _, v := range candidateVolumes {
+ if v.ReplicaPlacement > 0 {
+ if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
+ // TODO this logic is too simple, but should work most of the time
+ // Need a correct algorithm to handle all different cases
+ continue
+ }
+ }
+ if _, found := emptyNode.selectedVolumes[v.Id]; !found {
+ if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
+ delete(fullNode.selectedVolumes, v.Id)
+ emptyNode.selectedVolumes[v.Id] = v
+ hasMove = true
+ break
+ } else {
+ return err
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error {
+ collectionPrefix := v.Collection + "_"
+ if v.Collection == "" {
+ collectionPrefix = ""
+ }
+ fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
+ if applyBalancing {
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
+ }
+ return nil
+}
+
+func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
+ node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
+ for _, v := range node.info.VolumeInfos {
+ if fn(v) {
+ node.selectedVolumes[v.Id] = v
+ }
+ }
+}
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
new file mode 100644
index 000000000..ff976c345
--- /dev/null
+++ b/weed/shell/command_volume_configure_replication.go
@@ -0,0 +1,108 @@
+package shell
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeConfigureReplication{})
+}
+
+type commandVolumeConfigureReplication struct {
+}
+
+func (c *commandVolumeConfigureReplication) Name() string {
+ return "volume.configure.replication"
+}
+
+func (c *commandVolumeConfigureReplication) Help() string {
+ return `change volume replication value
+
+ This command changes a volume replication value. It should be followed by volume.fix.replication.
+
+`
+}
+
+func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
+ replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
+ if err = configureReplicationCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *replicationString == "" {
+ return fmt.Errorf("empty replication value")
+ }
+
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(*replicationString)
+ if err != nil {
+ return fmt.Errorf("replication format: %v", err)
+ }
+ replicaPlacementInt32 := uint32(replicaPlacement.Byte())
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ vid := needle.VolumeId(*volumeIdInt)
+
+ // find all data nodes with volumes that needs replication change
+ var allLocations []location
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ loc := newLocation(dc, string(rack), dn)
+ for _, v := range dn.VolumeInfos {
+ if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 {
+ allLocations = append(allLocations, loc)
+ continue
+ }
+ }
+ })
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no volume needs change")
+ }
+
+ for _, dst := range allLocations {
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
+ VolumeId: uint32(vid),
+ Replication: replicaPlacement.String(),
+ })
+ if configureErr != nil {
+ return configureErr
+ }
+ if resp.Error != "" {
+ return errors.New(resp.Error)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ }
+
+ return nil
+}
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
new file mode 100644
index 000000000..cdd10863f
--- /dev/null
+++ b/weed/shell/command_volume_copy.go
@@ -0,0 +1,55 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeCopy{})
+}
+
+type commandVolumeCopy struct {
+}
+
+func (c *commandVolumeCopy) Name() string {
+ return "volume.copy"
+}
+
+func (c *commandVolumeCopy) Help() string {
+ return `copy a volume from one volume server to another volume server
+
+ volume.copy <source volume server host:port> <target volume server host:port> <volume id>
+
+ This command copies a volume from one volume server to another volume server.
+ Usually you will want to unmount the volume first before copying.
+
+`
+}
+
+func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ if len(args) != 3 {
+ fmt.Fprintf(writer, "received args: %+v\n", args)
+ return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
+ }
+ sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2]
+
+ volumeId, err := needle.NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
+ }
+
+ if sourceVolumeServer == targetVolumeServer {
+ return fmt.Errorf("source and target volume servers are the same!")
+ }
+
+ _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ return
+}
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
new file mode 100644
index 000000000..c5cc9e277
--- /dev/null
+++ b/weed/shell/command_volume_delete.go
@@ -0,0 +1,50 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeDelete{})
+}
+
+type commandVolumeDelete struct {
+}
+
+func (c *commandVolumeDelete) Name() string {
+ return "volume.delete"
+}
+
+func (c *commandVolumeDelete) Help() string {
+ return `delete a live volume from one volume server
+
+ volume.delete <volume server host:port> <volume id>
+
+ This command deletes a volume from one volume server.
+
+`
+}
+
+func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ if len(args) != 2 {
+ fmt.Fprintf(writer, "received args: %+v\n", args)
+ return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
+ }
+ sourceVolumeServer, volumeIdString := args[0], args[1]
+
+ volumeId, err := needle.NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
+ }
+
+ return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+
+}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
new file mode 100644
index 000000000..6b5e4e735
--- /dev/null
+++ b/weed/shell/command_volume_fix_replication.go
@@ -0,0 +1,307 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "math/rand"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeFixReplication{})
+}
+
+type commandVolumeFixReplication struct {
+}
+
+func (c *commandVolumeFixReplication) Name() string {
+ return "volume.fix.replication"
+}
+
+func (c *commandVolumeFixReplication) Help() string {
+ return `add replicas to volumes that are missing replicas
+
+ This command file all under-replicated volumes, and find volume servers with free slots.
+ If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
+
+ volume.fix.replication -n # do not take action
+ volume.fix.replication # actually copying the volume files and mount the volume
+
+ Note:
+ * each time this will only add back one replica for one volume id. If there are multiple replicas
+ are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
+ * do not run this too quick within seconds, since the new volume replica may take a few seconds
+ to register itself to the master.
+
+`
+}
+
+func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ takeAction := true
+ if len(args) > 0 && args[0] == "-n" {
+ takeAction = false
+ }
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ // find all volumes that needs replication
+ // collect all data nodes
+ replicatedVolumeLocations := make(map[uint32][]location)
+ replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
+ var allLocations []location
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ loc := newLocation(dc, string(rack), dn)
+ for _, v := range dn.VolumeInfos {
+ if v.ReplicaPlacement > 0 {
+ replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
+ replicatedVolumeInfo[v.Id] = v
+ }
+ }
+ allLocations = append(allLocations, loc)
+ })
+
+ // find all under replicated volumes
+ underReplicatedVolumeLocations := make(map[uint32][]location)
+ for vid, locations := range replicatedVolumeLocations {
+ volumeInfo := replicatedVolumeInfo[vid]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(locations) {
+ underReplicatedVolumeLocations[vid] = locations
+ }
+ }
+
+ if len(underReplicatedVolumeLocations) == 0 {
+ return fmt.Errorf("no under replicated volumes")
+ }
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
+ }
+
+ // find the most under populated data nodes
+ keepDataNodesSorted(allLocations)
+
+ for vid, locations := range underReplicatedVolumeLocations {
+ volumeInfo := replicatedVolumeInfo[vid]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ foundNewLocation := false
+ for _, dst := range allLocations {
+ // check whether data nodes satisfy the constraints
+ if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
+ // ask the volume server to replicate the volume
+ sourceNodes := underReplicatedVolumeLocations[vid]
+ sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
+ foundNewLocation = true
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
+
+ if !takeAction {
+ break
+ }
+
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ VolumeId: volumeInfo.Id,
+ SourceDataNode: sourceNode.dataNode.Id,
+ })
+ if replicateErr != nil {
+ return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // adjust free volume count
+ dst.dataNode.FreeVolumeCount--
+ keepDataNodesSorted(allLocations)
+ break
+ }
+ }
+ if !foundNewLocation {
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ }
+
+ }
+
+ return nil
+}
+
+func keepDataNodesSorted(dataNodes []location) {
+ sort.Slice(dataNodes, func(i, j int) bool {
+ return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
+ })
+}
+
+/*
+ if on an existing data node {
+ return false
+ }
+ if different from existing dcs {
+ if lack on different dcs {
+ return true
+ }else{
+ return false
+ }
+ }
+ if not on primary dc {
+ return false
+ }
+ if different from existing racks {
+ if lack on different racks {
+ return true
+ }else{
+ return false
+ }
+ }
+ if not on primary rack {
+ return false
+ }
+ if lacks on same rack {
+ return true
+ } else {
+ return false
+ }
+*/
+func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+
+ existingDataNodes := make(map[string]int)
+ for _, loc := range existingLocations {
+ existingDataNodes[loc.String()] += 1
+ }
+ sameDataNodeCount := existingDataNodes[possibleLocation.String()]
+ // avoid duplicated volume on the same data node
+ if sameDataNodeCount > 0 {
+ return false
+ }
+
+ existingDataCenters := make(map[string]int)
+ for _, loc := range existingLocations {
+ existingDataCenters[loc.DataCenter()] += 1
+ }
+ primaryDataCenters, _ := findTopKeys(existingDataCenters)
+
+ // ensure data center count is within limit
+ if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
+ // different from existing dcs
+ if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
+ // lack on different dcs
+ return true
+ } else {
+ // adding this would go over the different dcs limit
+ return false
+ }
+ }
+ // now this is same as one of the existing data center
+ if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
+ // not on one of the primary dcs
+ return false
+ }
+
+ // now this is one of the primary dcs
+ existingRacks := make(map[string]int)
+ for _, loc := range existingLocations {
+ if loc.DataCenter() != possibleLocation.DataCenter() {
+ continue
+ }
+ existingRacks[loc.Rack()] += 1
+ }
+ primaryRacks, _ := findTopKeys(existingRacks)
+ sameRackCount := existingRacks[possibleLocation.Rack()]
+
+ // ensure rack count is within limit
+ if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ // different from existing racks
+ if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ // lack on different racks
+ return true
+ } else {
+ // adding this would go over the different racks limit
+ return false
+ }
+ }
+ // now this is same as one of the existing racks
+ if !isAmong(possibleLocation.Rack(), primaryRacks) {
+ // not on the primary rack
+ return false
+ }
+
+ // now this is on the primary rack
+
+ // different from existing data nodes
+ if sameRackCount < replicaPlacement.SameRackCount+1 {
+ // lack on same rack
+ return true
+ } else {
+ // adding this would go over the same data node limit
+ return false
+ }
+
+}
+
+func findTopKeys(m map[string]int) (topKeys []string, max int) {
+ for k, c := range m {
+ if max < c {
+ topKeys = topKeys[:0]
+ topKeys = append(topKeys, k)
+ max = c
+ } else if max == c {
+ topKeys = append(topKeys, k)
+ }
+ }
+ return
+}
+
+func isAmong(key string, keys []string) bool {
+ for _, k := range keys {
+ if k == key {
+ return true
+ }
+ }
+ return false
+}
+
+type location struct {
+ dc string
+ rack string
+ dataNode *master_pb.DataNodeInfo
+}
+
+func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
+ return location{
+ dc: dc,
+ rack: rack,
+ dataNode: dataNode,
+ }
+}
+
+func (l location) String() string {
+ return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
+}
+
+func (l location) Rack() string {
+ return fmt.Sprintf("%s %s", l.dc, l.rack)
+}
+
+func (l location) DataCenter() string {
+ return l.dc
+}
diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go
new file mode 100644
index 000000000..4cfbd96aa
--- /dev/null
+++ b/weed/shell/command_volume_fix_replication_test.go
@@ -0,0 +1,207 @@
+package shell
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+type testcase struct {
+ name string
+ replication string
+ existingLocations []location
+ possibleLocation location
+ expected bool
+}
+
+func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 100 negative",
+ replication: "100",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: false,
+ },
+ {
+ name: "test 100 positive",
+ replication: "100",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ possibleLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: true,
+ },
+ {
+ name: "test 022 positive",
+ replication: "022",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: true,
+ },
+ {
+ name: "test 022 negative",
+ replication: "022",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ {
+ name: "test 210 moved from 200 positive",
+ replication: "210",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: true,
+ },
+ {
+ name: "test 210 moved from 200 negative extra dc",
+ replication: "210",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc4", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ {
+ name: "test 210 moved from 200 negative extra data node",
+ replication: "210",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ }
+
+ runTests(tests, t)
+
+}
+
+func TestSatisfyReplicaPlacement01x(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 011 same existing rack",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ {
+ name: "test 011 negative",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: false,
+ },
+ {
+ name: "test 011 different existing racks",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ {
+ name: "test 011 different existing racks negative",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: false,
+ },
+ }
+
+ runTests(tests, t)
+
+}
+
+func TestSatisfyReplicaPlacement00x(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 001",
+ replication: "001",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: true,
+ },
+ {
+ name: "test 002 positive",
+ replication: "002",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ {
+ name: "test 002 negative, repeat the same node",
+ replication: "002",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: false,
+ },
+ {
+ name: "test 002 negative, enough node already",
+ replication: "002",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ }
+
+ runTests(tests, t)
+
+}
+
+func runTests(tests []testcase, t *testing.T) {
+ for _, tt := range tests {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
+ println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
+ if satisfyReplicaPlacement(replicaPlacement, tt.existingLocations, tt.possibleLocation) != tt.expected {
+ t.Errorf("%s: expect %v add %v to %s %+v",
+ tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.existingLocations)
+ }
+ }
+}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
new file mode 100644
index 000000000..69a1a63b4
--- /dev/null
+++ b/weed/shell/command_volume_fsck.go
@@ -0,0 +1,361 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math"
+ "os"
+ "path/filepath"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeFsck{})
+}
+
+type commandVolumeFsck struct {
+ env *CommandEnv
+}
+
+func (c *commandVolumeFsck) Name() string {
+ return "volume.fsck"
+}
+
+func (c *commandVolumeFsck) Help() string {
+ return `check all volumes to find entries not used by the filer
+
+ Important assumption!!!
+ the system is all used by one filer.
+
+ This command works this way:
+ 1. collect all file ids from all volumes, as set A
+ 2. collect all file ids from the filer, as set B
+ 3. find out the set A subtract B
+
+`
+}
+
+func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ verbose := fsckCommand.Bool("v", false, "verbose mode")
+ applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
+ if err = fsckCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ c.env = commandEnv
+
+ // create a temp folder
+ tempFolder, err := ioutil.TempDir("", "sw_fsck")
+ if err != nil {
+ return fmt.Errorf("failed to create temp folder: %v", err)
+ }
+ if *verbose {
+ fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
+ }
+ defer os.RemoveAll(tempFolder)
+
+ // collect all volume id locations
+ volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer)
+ if err != nil {
+ return fmt.Errorf("failed to collect all volume locations: %v", err)
+ }
+
+ // collect each volume file ids
+ for volumeId, vinfo := range volumeIdToVInfo {
+ err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
+ if err != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
+ }
+ }
+
+ // collect all filer file ids
+ if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
+ return fmt.Errorf("failed to collect file ids from filer: %v", err)
+ }
+
+ // volume file ids substract filer file ids
+ var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
+ for volumeId, vinfo := range volumeIdToVInfo {
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose)
+ if checkErr != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
+ }
+ totalInUseCount += inUseCount
+ totalOrphanChunkCount += uint64(len(orphanFileIds))
+ totalOrphanDataSize += orphanDataSize
+
+ if *applyPurging && len(orphanFileIds) > 0 {
+ if vinfo.isEcVolume {
+ fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n")
+ }
+ if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
+ return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
+ }
+ }
+ }
+
+ if totalOrphanChunkCount == 0 {
+ fmt.Fprintf(writer, "no orphan data\n")
+ return nil
+ }
+
+ if !*applyPurging {
+ pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
+ fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
+ totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
+
+ fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
+ }
+
+ return nil
+}
+
+func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
+
+ if verbose {
+ fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
+ }
+
+ return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+
+ ext := ".idx"
+ if vinfo.isEcVolume {
+ ext = ".ecx"
+ }
+
+ copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: volumeId,
+ Ext: ext,
+ CompactionRevision: math.MaxUint32,
+ StopOffset: math.MaxInt64,
+ Collection: vinfo.collection,
+ IsEcVolume: vinfo.isEcVolume,
+ IgnoreSourceFileNotFound: false,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
+ }
+
+ err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
+ if err != nil {
+ return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err)
+ }
+
+ return nil
+
+ })
+
+}
+
+func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error {
+
+ if verbose {
+ fmt.Fprintf(writer, "collecting file ids from filer ...\n")
+ }
+
+ files := make(map[uint32]*os.File)
+ for vid := range volumeIdToServer {
+ dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
+ }
+ files[vid] = dst
+ }
+ defer func() {
+ for _, f := range files {
+ f.Close()
+ }
+ }()
+
+ type Item struct {
+ vid uint32
+ fileKey uint64
+ }
+ return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
+ buffer := make([]byte, 8)
+ for item := range outputChan {
+ i := item.(*Item)
+ util.Uint64toBytes(buffer, i.fileKey)
+ files[i.vid].Write(buffer)
+ }
+ }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ for _, chunk := range entry.Entry.Chunks {
+ outputChan <- &Item{
+ vid: chunk.Fid.VolumeId,
+ fileKey: chunk.Fid.FileKey,
+ }
+ }
+ return nil
+ })
+}
+
+func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
+
+ db := needle_map.NewMemDb()
+ defer db.Close()
+
+ if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
+ return
+ }
+
+ filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
+ if err != nil {
+ return
+ }
+
+ dataLen := len(filerFileIdsData)
+ if dataLen%8 != 0 {
+ return 0, nil, 0, fmt.Errorf("filer data is corrupted")
+ }
+
+ for i := 0; i < len(filerFileIdsData); i += 8 {
+ fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
+ db.Delete(types.NeedleId(fileKey))
+ inUseCount++
+ }
+
+ var orphanFileCount uint64
+ db.AscendingVisit(func(n needle_map.NeedleValue) error {
+ // fmt.Printf("%d,%x\n", volumeId, n.Key)
+ orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
+ orphanFileCount++
+ orphanDataSize += uint64(n.Size)
+ return nil
+ })
+
+ if orphanFileCount > 0 {
+ pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
+ fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
+ volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
+ }
+
+ return
+
+}
+
+type VInfo struct {
+ server string
+ collection string
+ isEcVolume bool
+}
+
+func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
+
+ if verbose {
+ fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
+ }
+
+ volumeIdToServer = make(map[uint32]VInfo)
+ var resp *master_pb.VolumeListResponse
+ err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
+ for _, vi := range t.VolumeInfos {
+ volumeIdToServer[vi.Id] = VInfo{
+ server: t.Id,
+ collection: vi.Collection,
+ isEcVolume: false,
+ }
+ }
+ for _, ecShardInfo := range t.EcShardInfos {
+ volumeIdToServer[ecShardInfo.Id] = VInfo{
+ server: t.Id,
+ collection: ecShardInfo.Collection,
+ isEcVolume: true,
+ }
+ }
+ })
+
+ if verbose {
+ fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
+ }
+ return
+}
+
+func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
+ fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
+ locations, found := c.env.MasterClient.GetLocations(volumeId)
+ if !found {
+ return fmt.Errorf("failed to find volume %d locations", volumeId)
+ }
+
+ resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
+ var wg sync.WaitGroup
+ for _, location := range locations {
+ wg.Add(1)
+ go func(server string, fidList []string) {
+ defer wg.Done()
+
+ if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
+ err = deleteErr
+ } else if deleteResults != nil {
+ resultChan <- deleteResults
+ }
+
+ }(location.Url, fileIds)
+ }
+ wg.Wait()
+ close(resultChan)
+
+ for results := range resultChan {
+ for _, result := range results {
+ if result.Error != "" {
+ fmt.Fprintf(writer, "purge error: %s\n", result.Error)
+ }
+ }
+ }
+
+ return
+}
+
+func getVolumeFileIdFile(tempFolder string, vid uint32) string {
+ return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
+}
+
+func getFilerFileIdFile(tempFolder string, vid uint32) string {
+ return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
+}
+
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
+ flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
+ dst, err := os.OpenFile(fileName, flags, 0644)
+ if err != nil {
+ return nil
+ }
+ defer dst.Close()
+
+ for {
+ resp, receiveErr := client.Recv()
+ if receiveErr == io.EOF {
+ break
+ }
+ if receiveErr != nil {
+ return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ }
+ dst.Write(resp.FileContent)
+ }
+ return nil
+}
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
new file mode 100644
index 000000000..c5a9388fa
--- /dev/null
+++ b/weed/shell/command_volume_list.go
@@ -0,0 +1,133 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+
+ "io"
+ "sort"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeList{})
+}
+
+type commandVolumeList struct {
+}
+
+func (c *commandVolumeList) Name() string {
+ return "volume.list"
+}
+
+func (c *commandVolumeList) Help() string {
+ return `list all volumes
+
+ This command list all volumes as a tree of dataCenter > rack > dataNode > volume.
+
+`
+}
+
+func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ writeTopologyInfo(writer, resp.TopologyInfo, resp.VolumeSizeLimitMb)
+ return nil
+}
+
+func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) statistics {
+ fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d remote:%d volumeSizeLimit:%d MB\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount, volumeSizeLimitMb)
+ sort.Slice(t.DataCenterInfos, func(i, j int) bool {
+ return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id
+ })
+ var s statistics
+ for _, dc := range t.DataCenterInfos {
+ s = s.plus(writeDataCenterInfo(writer, dc))
+ }
+ fmt.Fprintf(writer, "%+v \n", s)
+ return s
+}
+func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics {
+ fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount)
+ var s statistics
+ sort.Slice(t.RackInfos, func(i, j int) bool {
+ return t.RackInfos[i].Id < t.RackInfos[j].Id
+ })
+ for _, r := range t.RackInfos {
+ s = s.plus(writeRackInfo(writer, r))
+ }
+ fmt.Fprintf(writer, " DataCenter %s %+v \n", t.Id, s)
+ return s
+}
+func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics {
+ fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount)
+ var s statistics
+ sort.Slice(t.DataNodeInfos, func(i, j int) bool {
+ return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id
+ })
+ for _, dn := range t.DataNodeInfos {
+ s = s.plus(writeDataNodeInfo(writer, dn))
+ }
+ fmt.Fprintf(writer, " Rack %s %+v \n", t.Id, s)
+ return s
+}
+func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics {
+ fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount)
+ var s statistics
+ sort.Slice(t.VolumeInfos, func(i, j int) bool {
+ return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id
+ })
+ for _, vi := range t.VolumeInfos {
+ s = s.plus(writeVolumeInformationMessage(writer, vi))
+ }
+ for _, ecShardInfo := range t.EcShardInfos {
+ fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds())
+ }
+ fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s)
+ return s
+}
+func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics {
+ fmt.Fprintf(writer, " volume %+v \n", t)
+ return newStatistics(t)
+}
+
+type statistics struct {
+ Size uint64
+ FileCount uint64
+ DeletedFileCount uint64
+ DeletedBytes uint64
+}
+
+func newStatistics(t *master_pb.VolumeInformationMessage) statistics {
+ return statistics{
+ Size: t.Size,
+ FileCount: t.FileCount,
+ DeletedFileCount: t.DeleteCount,
+ DeletedBytes: t.DeletedByteCount,
+ }
+}
+
+func (s statistics) plus(t statistics) statistics {
+ return statistics{
+ Size: s.Size + t.Size,
+ FileCount: s.FileCount + t.FileCount,
+ DeletedFileCount: s.DeletedFileCount + t.DeletedFileCount,
+ DeletedBytes: s.DeletedBytes + t.DeletedBytes,
+ }
+}
+
+func (s statistics) String() string {
+ if s.DeletedFileCount > 0 {
+ return fmt.Sprintf("total size:%d file_count:%d deleted_file:%d deleted_bytes:%d", s.Size, s.FileCount, s.DeletedFileCount, s.DeletedBytes)
+ }
+ return fmt.Sprintf("total size:%d file_count:%d", s.Size, s.FileCount)
+}
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
new file mode 100644
index 000000000..ded7b7e66
--- /dev/null
+++ b/weed/shell/command_volume_mount.go
@@ -0,0 +1,63 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeMount{})
+}
+
+type commandVolumeMount struct {
+}
+
+func (c *commandVolumeMount) Name() string {
+ return "volume.mount"
+}
+
+func (c *commandVolumeMount) Help() string {
+ return `mount a volume from one volume server
+
+ volume.mount <volume server host:port> <volume id>
+
+ This command mounts a volume from one volume server.
+
+`
+}
+
+func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ if len(args) != 2 {
+ fmt.Fprintf(writer, "received args: %+v\n", args)
+ return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
+ }
+ sourceVolumeServer, volumeIdString := args[0], args[1]
+
+ volumeId, err := needle.NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
+ }
+
+ return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+
+}
+
+func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return mountErr
+ })
+}
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
new file mode 100644
index 000000000..392b947e7
--- /dev/null
+++ b/weed/shell/command_volume_move.go
@@ -0,0 +1,129 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeMove{})
+}
+
+type commandVolumeMove struct {
+}
+
+func (c *commandVolumeMove) Name() string {
+ return "volume.move"
+}
+
+func (c *commandVolumeMove) Help() string {
+ return `move a live volume from one volume server to another volume server
+
+ volume.move <source volume server host:port> <target volume server host:port> <volume id>
+
+ This command move a live volume from one volume server to another volume server. Here are the steps:
+
+ 1. This command asks the target volume server to copy the source volume from source volume server, remember the last entry's timestamp.
+ 2. This command asks the target volume server to mount the new volume
+ Now the master will mark this volume id as readonly.
+ 3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain the requests.
+ 4. This command asks the source volume server to unmount the source volume
+ Now the master will mark this volume id as writable.
+ 5. This command asks the source volume server to delete the source volume
+
+`
+}
+
+func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ if len(args) != 3 {
+ fmt.Fprintf(writer, "received args: %+v\n", args)
+ return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
+ }
+ sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2]
+
+ volumeId, err := needle.NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
+ }
+
+ if sourceVolumeServer == targetVolumeServer {
+ return fmt.Errorf("source and target volume servers are the same!")
+ }
+
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
+}
+
+// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
+func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
+
+ log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
+ lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ if err != nil {
+ return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
+ }
+
+ log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
+ if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
+ return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
+ }
+
+ log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
+ if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil {
+ return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
+ }
+
+ log.Printf("moved volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
+ return nil
+}
+
+func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
+
+ err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ VolumeId: uint32(volumeId),
+ SourceDataNode: sourceVolumeServer,
+ })
+ if replicateErr == nil {
+ lastAppendAtNs = resp.LastAppendAtNs
+ }
+ return replicateErr
+ })
+
+ return
+}
+
+func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
+
+ return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
+ VolumeId: uint32(volumeId),
+ SinceNs: lastAppendAtNs,
+ IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
+ SourceVolumeServer: sourceVolumeServer,
+ })
+ return replicateErr
+ })
+
+}
+
+func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return deleteErr
+ })
+}
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
new file mode 100644
index 000000000..d31c8c031
--- /dev/null
+++ b/weed/shell/command_volume_tier_download.go
@@ -0,0 +1,170 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeTierDownload{})
+}
+
+type commandVolumeTierDownload struct {
+}
+
+func (c *commandVolumeTierDownload) Name() string {
+ return "volume.tier.download"
+}
+
+func (c *commandVolumeTierDownload) Help() string {
+ return `download the dat file of a volume from a remote tier
+
+ volume.tier.download [-collection=""]
+ volume.tier.download [-collection=""] -volumeId=<volume_id>
+
+ e.g.:
+ volume.tier.download -volumeId=7
+ volume.tier.download -volumeId=7
+
+ This command will download the dat file of a volume from a remote tier to a volume server in local cluster.
+
+`
+}
+
+func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := tierCommand.Int("volumeId", 0, "the volume id")
+ collection := tierCommand.String("collection", "", "the collection name")
+ if err = tierCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // collect topology information
+ topologyInfo, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ // volumeId is provided
+ if vid != 0 {
+ return doVolumeTierDownload(commandEnv, writer, *collection, vid)
+ }
+
+ // apply to all volumes in the collection
+ // reusing collectVolumeIdsForEcEncode for now
+ volumeIds := collectRemoteVolumes(topologyInfo, *collection)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("tier download volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
+
+ vidMap := make(map[uint32]bool)
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.VolumeInfos {
+ if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" {
+ vidMap[v.Id] = true
+ }
+ }
+ })
+
+ for vid := range vidMap {
+ vids = append(vids, needle.VolumeId(vid))
+ }
+
+ return
+}
+
+func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) {
+ // find volume location
+ locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ return fmt.Errorf("volume %d not found", vid)
+ }
+
+ // TODO parallelize this
+ for _, loc := range locations {
+ // copy the .dat file from remote tier to local
+ err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
+ if err != nil {
+ return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err)
+ }
+ }
+
+ return nil
+}
+
+func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
+
+ err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ })
+
+ var lastProcessed int64
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0
+
+ fmt.Fprintf(writer, "downloaded %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed)
+
+ lastProcessed = resp.Processed
+ }
+ if downloadErr != nil {
+ return downloadErr
+ }
+
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
+ VolumeId: uint32(volumeId),
+ })
+ if unmountErr != nil {
+ return unmountErr
+ }
+
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
+ VolumeId: uint32(volumeId),
+ })
+ if mountErr != nil {
+ return mountErr
+ }
+
+ return nil
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
new file mode 100644
index 000000000..f92cdc3e4
--- /dev/null
+++ b/weed/shell/command_volume_tier_upload.go
@@ -0,0 +1,151 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeTierUpload{})
+}
+
+type commandVolumeTierUpload struct {
+}
+
+func (c *commandVolumeTierUpload) Name() string {
+ return "volume.tier.upload"
+}
+
+func (c *commandVolumeTierUpload) Help() string {
+ return `upload the dat file of a volume to a remote tier
+
+ volume.tier.upload [-collection=""] [-fullPercent=95] [-quietFor=1h]
+ volume.tier.upload [-collection=""] -volumeId=<volume_id> -dest=<storage_backend> [-keepLocalDatFile]
+
+ e.g.:
+ volume.tier.upload -volumeId=7 -dest=s3
+ volume.tier.upload -volumeId=7 -dest=s3.default
+
+ The <storage_backend> is defined in master.toml.
+ For example, "s3.default" in [storage.backend.s3.default]
+
+ This command will move the dat file of a volume to a remote tier.
+
+ SeaweedFS enables scalable and fast local access to lots of files,
+ and the cloud storage is slower by cost efficient. How to combine them together?
+
+ Usually the data follows 80/20 rule: only 20% of data is frequently accessed.
+ We can offload the old volumes to the cloud.
+
+ With this, SeaweedFS can be both fast and scalable, and infinite storage space.
+ Just add more local SeaweedFS volume servers to increase the throughput.
+
+ The index file is still local, and the same O(1) disk read is applied to the remote file.
+
+`
+}
+
+func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := tierCommand.Int("volumeId", 0, "the volume id")
+ collection := tierCommand.String("collection", "", "the collection name")
+ fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
+ quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
+ dest := tierCommand.String("dest", "", "the target tier name")
+ keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file")
+ if err = tierCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // volumeId is provided
+ if vid != 0 {
+ return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
+ }
+
+ // apply to all volumes in the collection
+ // reusing collectVolumeIdsForEcEncode for now
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("tier upload volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
+ // find volume location
+ locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ return fmt.Errorf("volume %d not found", vid)
+ }
+
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ if err != nil {
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
+ }
+
+ // copy the .dat file to remote tier
+ err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
+ if err != nil {
+ return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err)
+ }
+
+ return nil
+}
+
+func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
+
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ DestinationBackendName: dest,
+ KeepLocalDatFile: keepLocalDatFile,
+ })
+
+ var lastProcessed int64
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0
+
+ fmt.Fprintf(writer, "copied %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed)
+
+ lastProcessed = resp.Processed
+ }
+
+ return copyErr
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
new file mode 100644
index 000000000..7596bb4c8
--- /dev/null
+++ b/weed/shell/command_volume_unmount.go
@@ -0,0 +1,63 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeUnmount{})
+}
+
+type commandVolumeUnmount struct {
+}
+
+func (c *commandVolumeUnmount) Name() string {
+ return "volume.unmount"
+}
+
+func (c *commandVolumeUnmount) Help() string {
+ return `unmount a volume from one volume server
+
+ volume.unmount <volume server host:port> <volume id>
+
+ This command unmounts a volume from one volume server.
+
+`
+}
+
+func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ if len(args) != 2 {
+ fmt.Fprintf(writer, "received args: %+v\n", args)
+ return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
+ }
+ sourceVolumeServer, volumeIdString := args[0], args[1]
+
+ volumeId, err := needle.NewVolumeId(volumeIdString)
+ if err != nil {
+ return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
+ }
+
+ return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+
+}
+
+func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
+ VolumeId: uint32(volumeId),
+ })
+ return unmountErr
+ })
+}
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
new file mode 100644
index 000000000..f61ed9f82
--- /dev/null
+++ b/weed/shell/commands.go
@@ -0,0 +1,137 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+ "net/url"
+ "strconv"
+ "strings"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks"
+)
+
+type ShellOptions struct {
+ Masters *string
+ GrpcDialOption grpc.DialOption
+ // shell transient context
+ FilerHost string
+ FilerPort int64
+ Directory string
+}
+
+type CommandEnv struct {
+ env map[string]string
+ MasterClient *wdclient.MasterClient
+ option ShellOptions
+ locker *exclusive_locks.ExclusiveLocker
+}
+
+type command interface {
+ Name() string
+ Help() string
+ Do([]string, *CommandEnv, io.Writer) error
+}
+
+var (
+ Commands = []command{}
+)
+
+func NewCommandEnv(options ShellOptions) *CommandEnv {
+ ce := &CommandEnv{
+ env: make(map[string]string),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
+ option: options,
+ }
+ ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
+ return ce
+}
+
+func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
+ if strings.HasPrefix(input, "http") {
+ err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
+ return
+ }
+ if !strings.HasPrefix(input, "/") {
+ input = util.Join(ce.option.Directory, input)
+ }
+ return input, err
+}
+
+func (ce *CommandEnv) isDirectory(path string) bool {
+
+ return ce.checkDirectory(path) == nil
+
+}
+
+func (ce *CommandEnv) confirmIsLocked() error {
+
+ if ce.locker.IsLocking() {
+ return nil
+ }
+
+ return fmt.Errorf("need to lock to continue")
+
+}
+
+func (ce *CommandEnv) checkDirectory(path string) error {
+
+ dir, name := util.FullPath(path).DirAndName()
+
+ exists, err := filer_pb.Exists(ce, dir, name, true)
+
+ if !exists {
+ return fmt.Errorf("%s is not a directory", path)
+ }
+
+ return err
+
+}
+
+var _ = filer_pb.FilerClient(&CommandEnv{})
+
+func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress := fmt.Sprintf("%s:%d", ce.option.FilerHost, ce.option.FilerPort+10000)
+ return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn)
+
+}
+
+func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
+}
+
+func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
+ if strings.HasPrefix(entryPath, "http") {
+ var u *url.URL
+ u, err = url.Parse(entryPath)
+ if err != nil {
+ return
+ }
+ filerServer = u.Hostname()
+ portString := u.Port()
+ if portString != "" {
+ filerPort, err = strconv.ParseInt(portString, 10, 32)
+ }
+ path = u.Path
+ } else {
+ err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
+ }
+ return
+}
+
+func findInputDirectory(args []string) (input string) {
+ input = "."
+ if len(args) > 0 {
+ input = args[len(args)-1]
+ if strings.HasPrefix(input, "-") {
+ input = "."
+ }
+ }
+ return input
+}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
new file mode 100644
index 000000000..4632a1fb0
--- /dev/null
+++ b/weed/shell/shell_liner.go
@@ -0,0 +1,154 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "regexp"
+ "sort"
+ "strings"
+
+ "github.com/peterh/liner"
+)
+
+var (
+ line *liner.State
+ historyPath = path.Join(os.TempDir(), "weed-shell")
+)
+
+func RunShell(options ShellOptions) {
+
+ line = liner.NewLiner()
+ defer line.Close()
+
+ line.SetCtrlCAborts(true)
+
+ setCompletionHandler()
+ loadHistory()
+
+ defer saveHistory()
+
+ reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
+
+ commandEnv := NewCommandEnv(options)
+
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+ commandEnv.MasterClient.WaitUntilConnected()
+
+ for {
+ cmd, err := line.Prompt("> ")
+ if err != nil {
+ if err != io.EOF {
+ fmt.Printf("%v\n", err)
+ }
+ return
+ }
+
+ for _, c := range strings.Split(cmd, ";") {
+ if processEachCmd(reg, c, commandEnv) {
+ return
+ }
+ }
+ }
+}
+
+func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool {
+ cmds := reg.FindAllString(cmd, -1)
+ if len(cmds) == 0 {
+ return false
+ } else {
+ line.AppendHistory(cmd)
+
+ args := make([]string, len(cmds[1:]))
+
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+
+ cmd := strings.ToLower(cmds[0])
+ if cmd == "help" || cmd == "?" {
+ printHelp(cmds)
+ } else if cmd == "exit" || cmd == "quit" {
+ return true
+ } else {
+ foundCommand := false
+ for _, c := range Commands {
+ if c.Name() == cmd || c.Name() == "fs."+cmd {
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ fmt.Fprintf(os.Stderr, "error: %v\n", err)
+ }
+ foundCommand = true
+ }
+ }
+ if !foundCommand {
+ fmt.Fprintf(os.Stderr, "unknown command: %v\n", cmd)
+ }
+ }
+
+ }
+ return false
+}
+
+func printGenericHelp() {
+ msg :=
+ `Type: "help <command>" for help on <command>
+`
+ fmt.Print(msg)
+
+ sort.Slice(Commands, func(i, j int) bool {
+ return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0
+ })
+ for _, c := range Commands {
+ helpTexts := strings.SplitN(c.Help(), "\n", 2)
+ fmt.Printf(" %-30s\t# %s \n", c.Name(), helpTexts[0])
+ }
+}
+
+func printHelp(cmds []string) {
+ args := cmds[1:]
+ if len(args) == 0 {
+ printGenericHelp()
+ } else if len(args) > 1 {
+ fmt.Println()
+ } else {
+ cmd := strings.ToLower(args[0])
+
+ sort.Slice(Commands, func(i, j int) bool {
+ return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0
+ })
+
+ for _, c := range Commands {
+ if c.Name() == cmd {
+ fmt.Printf(" %s\t# %s\n", c.Name(), c.Help())
+ }
+ }
+ }
+}
+
+func setCompletionHandler() {
+ line.SetCompleter(func(line string) (c []string) {
+ for _, i := range Commands {
+ if strings.HasPrefix(i.Name(), strings.ToLower(line)) {
+ c = append(c, i.Name())
+ }
+ }
+ return
+ })
+}
+
+func loadHistory() {
+ if f, err := os.Open(historyPath); err == nil {
+ line.ReadHistory(f)
+ f.Close()
+ }
+}
+
+func saveHistory() {
+ if f, err := os.Create(historyPath); err != nil {
+ fmt.Printf("Error writing history file: %v\n", err)
+ } else {
+ line.WriteHistory(f)
+ f.Close()
+ }
+}