diff options
| author | yourchanges <yourchanges@gmail.com> | 2020-07-10 09:44:32 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-10 09:44:32 +0800 |
| commit | e67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch) | |
| tree | 4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/shell | |
| parent | 2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff) | |
| parent | 1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff) | |
| download | seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip | |
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/shell')
38 files changed, 5473 insertions, 0 deletions
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go new file mode 100644 index 000000000..52d96e4c3 --- /dev/null +++ b/weed/shell/command_bucket_create.go @@ -0,0 +1,83 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandBucketCreate{}) +} + +type commandBucketCreate struct { +} + +func (c *commandBucketCreate) Name() string { + return "bucket.create" +} + +func (c *commandBucketCreate) Help() string { + return `create a bucket with a given name + + Example: + bucket.create -name <bucket_name> -replication 001 +` +} + +func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + bucketName := bucketCommand.String("name", "", "bucket name") + replication := bucketCommand.String("replication", "", "replication setting for the bucket") + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + if *bucketName == "" { + return fmt.Errorf("empty bucket name") + } + + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer configuration: %v", err) + } + filerBucketsPath := resp.DirBuckets + + println("create bucket under", filerBucketsPath) + + entry := &filer_pb.Entry{ + Name: *bucketName, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0777 | os.ModeDir), + Collection: *bucketName, + Replication: *replication, + }, + } + + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: filerBucketsPath, + Entry: entry, + }); err != nil { + return err + } + + println("created bucket", *bucketName) + + return nil + + }) + + return err + +} diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go new file mode 100644 index 000000000..8f5f63b46 --- /dev/null +++ b/weed/shell/command_bucket_delete.go @@ -0,0 +1,54 @@ +package shell + +import ( + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandBucketDelete{}) +} + +type commandBucketDelete struct { +} + +func (c *commandBucketDelete) Name() string { + return "bucket.delete" +} + +func (c *commandBucketDelete) Help() string { + return `delete a bucket by a given name + + bucket.delete -name <bucket_name> +` +} + +func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + bucketName := bucketCommand.String("name", "", "bucket name") + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + if *bucketName == "" { + return fmt.Errorf("empty bucket name") + } + + _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + if parseErr != nil { + return parseErr + } + + var filerBucketsPath string + filerBucketsPath, err = readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read buckets: %v", err) + } + + return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false) + +} diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go new file mode 100644 index 000000000..2e446b6b2 --- /dev/null +++ b/weed/shell/command_bucket_list.go @@ -0,0 +1,78 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "math" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandBucketList{}) +} + +type commandBucketList struct { +} + +func (c *commandBucketList) Name() string { + return "bucket.list" +} + +func (c *commandBucketList) Help() string { + return `list all buckets + +` +} + +func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + if parseErr != nil { + return parseErr + } + + var filerBucketsPath string + filerBucketsPath, err = readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read buckets: %v", err) + } + + err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" { + fmt.Fprintf(writer, " %s\n", entry.Name) + } else { + fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication) + } + return nil + }, "", false, math.MaxUint32) + if err != nil { + return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) + } + + return err + +} + +func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer configuration: %v", err) + } + filerBucketsPath = resp.DirBuckets + + return nil + + }) + + return filerBucketsPath, err +} diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go new file mode 100644 index 000000000..4b3d7f0be --- /dev/null +++ b/weed/shell/command_collection_delete.go @@ -0,0 +1,50 @@ +package shell + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandCollectionDelete{}) +} + +type commandCollectionDelete struct { +} + +func (c *commandCollectionDelete) Name() string { + return "collection.delete" +} + +func (c *commandCollectionDelete) Help() string { + return `delete specified collection + + collection.delete <collection_name> + +` +} + +func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if len(args) == 0 { + return nil + } + + collectionName := args[0] + + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ + Name: collectionName, + }) + return err + }) + if err != nil { + return + } + + fmt.Fprintf(writer, "collection %s is deleted.\n", collectionName) + + return nil +} diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go new file mode 100644 index 000000000..2a114e61b --- /dev/null +++ b/weed/shell/command_collection_list.go @@ -0,0 +1,58 @@ +package shell + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandCollectionList{}) +} + +type commandCollectionList struct { +} + +func (c *commandCollectionList) Name() string { + return "collection.list" +} + +func (c *commandCollectionList) Help() string { + return `list all collections` +} + +func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + collections, err := ListCollectionNames(commandEnv, true, true) + + if err != nil { + return err + } + + for _, c := range collections { + fmt.Fprintf(writer, "collection:\"%s\"\n", c) + } + + fmt.Fprintf(writer, "Total %d collections.\n", len(collections)) + + return nil +} + +func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { + var resp *master_pb.CollectionListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ + IncludeNormalVolumes: includeNormalVolumes, + IncludeEcVolumes: includeEcVolumes, + }) + return err + }) + if err != nil { + return + } + for _, c := range resp.Collections { + collections = append(collections, c.Name) + } + return +} diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go new file mode 100644 index 000000000..1ddb6a490 --- /dev/null +++ b/weed/shell/command_ec_balance.go @@ -0,0 +1,519 @@ +package shell + +import ( + "flag" + "fmt" + "io" + "sort" + + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandEcBalance{}) +} + +type commandEcBalance struct { +} + +func (c *commandEcBalance) Name() string { + return "ec.balance" +} + +func (c *commandEcBalance) Help() string { + return `balance all ec shards among all racks and volume servers + + ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>] + + Algorithm: + + For each type of volume server (different max volume count limit){ + for each collection: + balanceEcVolumes(collectionName) + for each rack: + balanceEcRack(rack) + } + + func balanceEcVolumes(collectionName){ + for each volume: + doDeduplicateEcShards(volumeId) + + tracks rack~shardCount mapping + for each volume: + doBalanceEcShardsAcrossRacks(volumeId) + + for each volume: + doBalanceEcShardsWithinRacks(volumeId) + } + + // spread ec shards into more racks + func doBalanceEcShardsAcrossRacks(volumeId){ + tracks rack~volumeIdShardCount mapping + averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc + ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack + for each ecShardsToMove { + destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack) + destVolumeServers = volume servers on the destRack + pickOneEcNodeAndMoveOneShard(destVolumeServers) + } + } + + func doBalanceEcShardsWithinRacks(volumeId){ + racks = collect all racks that the volume id is on + for rack, shards := range racks + doBalanceEcShardsWithinOneRack(volumeId, shards, rack) + } + + // move ec shards + func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){ + tracks volumeServer~volumeIdShardCount mapping + averageShardCount = len(shards) / numVolumeServers + volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack + ecShardsToMove = select overflown ec shards from volumeServersOverAverage + for each ecShardsToMove { + destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount) + pickOneEcNodeAndMoveOneShard(destVolumeServers) + } + } + + // move ec shards while keeping shard distribution for the same volume unchanged or more even + func balanceEcRack(rack){ + averageShardCount = total shards / numVolumeServers + for hasMovedOneEcShard { + sort all volume servers ordered by the number of local ec shards + pick the volume server A with the lowest number of ec shards x + pick the volume server B with the highest number of ec shards y + if y > averageShardCount and x +1 <= averageShardCount { + if B has a ec shard with volume id v that A does not have { + move one ec shard v from B to A + hasMovedOneEcShard = true + } + } + } + } + +` +} + +func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") + applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan") + if err = balanceCommand.Parse(args); err != nil { + return nil + } + + // collect all ec nodes + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc) + if err != nil { + return err + } + if totalFreeEcSlots < 1 { + return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots) + } + + racks := collectRacks(allEcNodes) + + if *collection == "EACH_COLLECTION" { + collections, err := ListCollectionNames(commandEnv, false, true) + if err != nil { + return err + } + fmt.Printf("balanceEcVolumes collections %+v\n", len(collections)) + for _, c := range collections { + fmt.Printf("balanceEcVolumes collection %+v\n", c) + if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, *applyBalancing); err != nil { + return err + } + } + } else { + if err = balanceEcVolumes(commandEnv, *collection, allEcNodes, racks, *applyBalancing); err != nil { + return err + } + } + + if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil { + return fmt.Errorf("balance ec racks: %v", err) + } + + return nil +} + +func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { + // collect racks info + racks := make(map[RackId]*EcRack) + for _, ecNode := range allEcNodes { + if racks[ecNode.rack] == nil { + racks[ecNode.rack] = &EcRack{ + ecNodes: make(map[EcNodeId]*EcNode), + } + } + racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode + racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot + } + return racks +} + +func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { + + fmt.Printf("balanceEcVolumes %s\n", collection) + + if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil { + return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err) + } + + if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { + return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) + } + + if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { + return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) + } + + return nil +} + +func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error { + // vid => []ecNode + vidLocations := collectVolumeIdToEcNodes(allEcNodes) + // deduplicate ec shards + for vid, locations := range vidLocations { + if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil { + return err + } + } + return nil +} + +func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { + + // check whether this volume has ecNodes that are over average + shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount) + for _, ecNode := range locations { + shardBits := findEcVolumeShards(ecNode, vid) + for _, shardId := range shardBits.ShardIds() { + shardToLocations[shardId] = append(shardToLocations[shardId], ecNode) + } + } + for shardId, ecNodes := range shardToLocations { + if len(ecNodes) <= 1 { + continue + } + sortEcNodesByFreeslotsAscending(ecNodes) + fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id) + if !applyBalancing { + continue + } + + duplicatedShardIds := []uint32{uint32(shardId)} + for _, ecNode := range ecNodes[1:] { + if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + return err + } + if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + return err + } + ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) + } + } + return nil +} + +func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { + // collect vid => []ecNode, since previous steps can change the locations + vidLocations := collectVolumeIdToEcNodes(allEcNodes) + // spread the ec shards evenly + for vid, locations := range vidLocations { + if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil { + return err + } + } + return nil +} + +func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { + + // calculate average number of shards an ec rack should have for one volume + averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) + + // see the volume's shards are in how many racks, and how many in each rack + rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) { + shardBits := findEcVolumeShards(ecNode, vid) + return string(ecNode.rack), shardBits.ShardIdCount() + }) + rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { + return string(ecNode.rack) + }) + + // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack + ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode) + for rackId, count := range rackToShardCount { + if count > averageShardsPerEcRack { + possibleEcNodes := rackEcNodesWithVid[rackId] + for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) { + ecShardsToMove[shardId] = ecNode + } + } + } + + for shardId, ecNode := range ecShardsToMove { + rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack) + if rackId == "" { + fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id) + continue + } + var possibleDestinationEcNodes []*EcNode + for _, n := range racks[rackId].ecNodes { + possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) + } + err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) + if err != nil { + return err + } + rackToShardCount[string(rackId)] += 1 + rackToShardCount[string(ecNode.rack)] -= 1 + racks[rackId].freeEcSlot -= 1 + racks[ecNode.rack].freeEcSlot += 1 + } + + return nil +} + +func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId { + + // TODO later may need to add some randomness + + for rackId, rack := range rackToEcNodes { + if rackToShardCount[string(rackId)] >= averageShardsPerEcRack { + continue + } + + if rack.freeEcSlot <= 0 { + continue + } + + return rackId + } + + return "" +} + +func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { + // collect vid => []ecNode, since previous steps can change the locations + vidLocations := collectVolumeIdToEcNodes(allEcNodes) + + // spread the ec shards evenly + for vid, locations := range vidLocations { + + // see the volume's shards are in how many racks, and how many in each rack + rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) { + shardBits := findEcVolumeShards(ecNode, vid) + return string(ecNode.rack), shardBits.ShardIdCount() + }) + rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string { + return string(ecNode.rack) + }) + + for rackId, _ := range rackToShardCount { + + var possibleDestinationEcNodes []*EcNode + for _, n := range racks[RackId(rackId)].ecNodes { + possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) + } + sourceEcNodes := rackEcNodesWithVid[rackId] + averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) + if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil { + return err + } + } + } + return nil +} + +func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { + + for _, ecNode := range existingLocations { + + shardBits := findEcVolumeShards(ecNode, vid) + overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode + + for _, shardId := range shardBits.ShardIds() { + + if overLimitCount <= 0 { + break + } + + fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) + + err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) + if err != nil { + return err + } + + overLimitCount-- + } + } + + return nil +} + +func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error { + + // balance one rack for all ec shards + for _, ecRack := range racks { + if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil { + return err + } + } + return nil +} + +func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error { + + if len(ecRack.ecNodes) <= 1 { + return nil + } + + var rackEcNodes []*EcNode + for _, node := range ecRack.ecNodes { + rackEcNodes = append(rackEcNodes, node) + } + + ecNodeIdToShardCount := groupByCount(rackEcNodes, func(node *EcNode) (id string, count int) { + for _, ecShardInfo := range node.info.EcShardInfos { + count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount() + } + return node.info.Id, count + }) + + var totalShardCount int + for _, count := range ecNodeIdToShardCount { + totalShardCount += count + } + + averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes)) + + hasMove := true + for hasMove { + hasMove = false + sort.Slice(rackEcNodes, func(i, j int) bool { + return rackEcNodes[i].freeEcSlot > rackEcNodes[j].freeEcSlot + }) + emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1] + emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id] + if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { + + emptyNodeIds := make(map[uint32]bool) + for _, shards := range emptyNode.info.EcShardInfos { + emptyNodeIds[shards.Id] = true + } + for _, shards := range fullNode.info.EcShardInfos { + if _, found := emptyNodeIds[shards.Id]; !found { + for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { + + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + + err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) + if err != nil { + return err + } + + ecNodeIdToShardCount[emptyNode.info.Id]++ + ecNodeIdToShardCount[fullNode.info.Id]-- + hasMove = true + break + } + break + } + } + } + } + + return nil +} + +func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { + + sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes) + + for _, destEcNode := range possibleDestinationEcNodes { + if destEcNode.info.Id == existingLocation.info.Id { + continue + } + + if destEcNode.freeEcSlot <= 0 { + continue + } + if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode { + continue + } + + fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) + + err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) + if err != nil { + return err + } + + return nil + } + + return nil +} + +func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode { + picked := make(map[erasure_coding.ShardId]*EcNode) + var candidateEcNodes []*CandidateEcNode + for _, ecNode := range ecNodes { + shardBits := findEcVolumeShards(ecNode, vid) + if shardBits.ShardIdCount() > 0 { + candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{ + ecNode: ecNode, + shardCount: shardBits.ShardIdCount(), + }) + } + } + sort.Slice(candidateEcNodes, func(i, j int) bool { + return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount + }) + for i := 0; i < n; i++ { + selectedEcNodeIndex := -1 + for i, candidateEcNode := range candidateEcNodes { + shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid) + if shardBits > 0 { + selectedEcNodeIndex = i + for _, shardId := range shardBits.ShardIds() { + candidateEcNode.shardCount-- + picked[shardId] = candidateEcNode.ecNode + candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}) + break + } + break + } + } + if selectedEcNodeIndex >= 0 { + ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool { + return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount + }) + } + + } + return picked +} + +func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode { + vidLocations := make(map[needle.VolumeId][]*EcNode) + for _, ecNode := range allEcNodes { + for _, shardInfo := range ecNode.info.EcShardInfos { + vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode) + } + } + return vidLocations +} diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go new file mode 100644 index 000000000..0db119d3c --- /dev/null +++ b/weed/shell/command_ec_common.go @@ -0,0 +1,337 @@ +package shell + +import ( + "context" + "fmt" + "math" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { + + copiedShardIds := []uint32{uint32(shardId)} + + if applyBalancing { + + // ask destination node to copy shard and the ecx file from source node, and mount it + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id) + if err != nil { + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + // ask source node to delete the shard, and maybe the ecx file + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + + } + + destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds) + existingLocation.deleteEcVolumeShards(vid, copiedShardIds) + + return nil + +} + +func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, + targetServer *EcNode, shardIdsToCopy []uint32, + volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { + + fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) + + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + if targetServer.info.Id != existingLocation { + + fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: shardIdsToCopy, + CopyEcxFile: true, + CopyEcjFile: true, + CopyVifFile: true, + SourceDataNode: existingLocation, + }) + if copyErr != nil { + return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr) + } + } + + fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id) + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: shardIdsToCopy, + }) + if mountErr != nil { + return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr) + } + + if targetServer.info.Id != existingLocation { + copiedShardIds = shardIdsToCopy + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) + } + + return nil + }) + + if err != nil { + return + } + + return +} + +func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) { + for _, dc := range topo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + fn(dc.Id, RackId(rack.Id), dn) + } + } + } +} + +func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) { + sort.Slice(ecNodes, func(i, j int) bool { + return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot + }) +} + +func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) { + sort.Slice(ecNodes, func(i, j int) bool { + return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot + }) +} + +type CandidateEcNode struct { + ecNode *EcNode + shardCount int +} + +// if the index node changed the freeEcSlot, need to keep every EcNode still sorted +func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) { + for i := index - 1; i >= 0; i-- { + if lessThan(i+1, i) { + swap(data, i, i+1) + } else { + break + } + } + for i := index + 1; i < len(data); i++ { + if lessThan(i, i-1) { + swap(data, i, i-1) + } else { + break + } + } +} + +func swap(data []*CandidateEcNode, i, j int) { + t := data[i] + data[i] = data[j] + data[j] = t +} + +func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { + for _, ecShardInfo := range ecShardInfos { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + count += shardBits.ShardIdCount() + } + return +} + +func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { + return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos) +} + +type RackId string +type EcNodeId string + +type EcNode struct { + info *master_pb.DataNodeInfo + dc string + rack RackId + freeEcSlot int +} + +type EcRack struct { + ecNodes map[EcNodeId]*EcNode + freeEcSlot int +} + +func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { + + // list all possible locations + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return nil, 0, err + } + + // find out all volume servers with one slot left. + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + if selectedDataCenter != "" && selectedDataCenter != dc { + return + } + + freeEcSlots := countFreeShardSlots(dn) + ecNodes = append(ecNodes, &EcNode{ + info: dn, + dc: dc, + rack: rack, + freeEcSlot: int(freeEcSlots), + }) + totalFreeEcSlots += freeEcSlots + }) + + sortEcNodesByFreeslotsDecending(ecNodes) + + return +} + +func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { + + fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: toBeDeletedShardIds, + }) + return deleteErr + }) + +} + +func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { + + fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ + VolumeId: uint32(volumeId), + ShardIds: toBeUnmountedhardIds, + }) + return deleteErr + }) +} + +func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { + + fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) + + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: toBeMountedhardIds, + }) + return mountErr + }) +} + +func ceilDivide(total, n int) int { + return int(math.Ceil(float64(total) / float64(n))) +} + +func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { + + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + return erasure_coding.ShardBits(shardInfo.EcIndexBits) + } + } + + return 0 +} + +func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { + + foundVolume := false + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + foundVolume = true + break + } + } + + if !foundVolume { + var newShardBits erasure_coding.ShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ + Id: uint32(vid), + Collection: collection, + EcIndexBits: uint32(newShardBits), + }) + ecNode.freeEcSlot -= len(shardIds) + } + + return ecNode +} + +func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { + + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + } + } + + return ecNode +} + +func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int { + countMap := make(map[string]int) + for _, d := range data { + id, count := identifierFn(d) + countMap[id] += count + } + return countMap +} + +func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode { + groupMap := make(map[string][]*EcNode) + for _, d := range data { + id := identifierFn(d) + groupMap[id] = append(groupMap[id], d) + } + return groupMap +} diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go new file mode 100644 index 000000000..5f03df58c --- /dev/null +++ b/weed/shell/command_ec_decode.go @@ -0,0 +1,268 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandEcDecode{}) +} + +type commandEcDecode struct { +} + +func (c *commandEcDecode) Name() string { + return "ec.decode" +} + +func (c *commandEcDecode) Help() string { + return `decode a erasure coded volume into a normal volume + + ec.decode [-collection=""] [-volumeId=<volume_id>] + +` +} + +func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := encodeCommand.Int("volumeId", 0, "the volume id") + collection := encodeCommand.String("collection", "", "the collection name") + if err = encodeCommand.Parse(args); err != nil { + return nil + } + + vid := needle.VolumeId(*volumeId) + + // collect topology information + topologyInfo, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } + + // volumeId is provided + if vid != 0 { + return doEcDecode(commandEnv, topologyInfo, *collection, vid) + } + + // apply to all volumes in the collection + volumeIds := collectEcShardIds(topologyInfo, *collection) + fmt.Printf("ec encode volumes: %v\n", volumeIds) + for _, vid := range volumeIds { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { + // find volume location + nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) + + fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) + + // collect ec shards to the server with most space + targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid) + if err != nil { + return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) + } + + // generate a normal volume + err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) + if err != nil { + return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) + } + + // delete the previous ec shards + err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) + if err != nil { + return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) + } + + return nil +} + +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { + + // mount volume + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ + VolumeId: uint32(vid), + }) + return mountErr + }); err != nil { + return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) + } + + // unmount ec shards + for location, ecIndexBits := range nodeToEcIndexBits { + fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) + if err != nil { + return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) + } + } + // delete ec shards + for location, ecIndexBits := range nodeToEcIndexBits { + fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) + if err != nil { + return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) + } + } + + return nil +} + +func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { + + fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) + + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: uint32(vid), + Collection: collection, + }) + return genErr + }) + + return err + +} + +func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { + + maxShardCount := 0 + var exisitngEcIndexBits erasure_coding.ShardBits + for loc, ecIndexBits := range nodeToEcIndexBits { + toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount() + if toBeCopiedShardCount > maxShardCount { + maxShardCount = toBeCopiedShardCount + targetNodeLocation = loc + exisitngEcIndexBits = ecIndexBits + } + } + + fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits) + + var copiedEcIndexBits erasure_coding.ShardBits + for loc, ecIndexBits := range nodeToEcIndexBits { + if loc == targetNodeLocation { + continue + } + + needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards() + if needToCopyEcIndexBits.ShardIdCount() == 0 { + continue + } + + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) + + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(vid), + Collection: collection, + ShardIds: needToCopyEcIndexBits.ToUint32Slice(), + CopyEcxFile: false, + CopyEcjFile: true, + CopyVifFile: true, + SourceDataNode: loc, + }) + if copyErr != nil { + return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr) + } + + return nil + }) + + if err != nil { + break + } + + copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits) + + } + + nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits) + + return targetNodeLocation, err + +} + +func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + return resp.TopologyInfo, nil + +} + +func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) { + + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.EcShardInfos { + if v.Collection == selectedCollection && v.Id == uint32(vid) { + ecShardInfos = append(ecShardInfos, v) + } + } + }) + + return +} + +func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { + + vidMap := make(map[uint32]bool) + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.EcShardInfos { + if v.Collection == selectedCollection { + vidMap[v.Id] = true + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} + +func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits { + + nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits) + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.EcShardInfos { + if v.Id == uint32(vid) { + nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + } + } + }) + + return nodeToEcIndexBits +} diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go new file mode 100644 index 000000000..5a8146954 --- /dev/null +++ b/weed/shell/command_ec_encode.go @@ -0,0 +1,298 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/wdclient" +) + +func init() { + Commands = append(Commands, &commandEcEncode{}) +} + +type commandEcEncode struct { +} + +func (c *commandEcEncode) Name() string { + return "ec.encode" +} + +func (c *commandEcEncode) Help() string { + return `apply erasure coding to a volume + + ec.encode [-collection=""] [-fullPercent=95] [-quietFor=1h] + ec.encode [-collection=""] [-volumeId=<volume_id>] + + This command will: + 1. freeze one volume + 2. apply erasure coding to the volume + 3. move the encoded shards to multiple volume servers + + The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford + to lose 4 volume servers. + + If the number of volumes are not high, the worst case is that you only have 4 volume servers, + and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server. + + If you only have less than 4 volume servers, with erasure coding, at least you can afford to + have 4 corrupted shard files. + +` +} + +func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := encodeCommand.Int("volumeId", 0, "the volume id") + collection := encodeCommand.String("collection", "", "the collection name") + fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") + quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period") + if err = encodeCommand.Parse(args); err != nil { + return nil + } + + vid := needle.VolumeId(*volumeId) + + // volumeId is provided + if vid != 0 { + return doEcEncode(commandEnv, *collection, vid) + } + + // apply to all volumes in the collection + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) + if err != nil { + return err + } + fmt.Printf("ec encode volumes: %v\n", volumeIds) + for _, vid := range volumeIds { + if err = doEcEncode(commandEnv, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { + // find volume location + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { + return fmt.Errorf("volume %d not found", vid) + } + + // fmt.Printf("found ec %d shards on %v\n", vid, locations) + + // mark the volume as readonly + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + if err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) + } + + // generate ec shards + err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) + if err != nil { + return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) + } + + // balance the ec shards to current cluster + err = spreadEcShards(commandEnv, vid, collection, locations) + if err != nil { + return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) + } + + return nil +} + +func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { + + for _, location := range locations { + + fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url) + + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + return markErr + }) + + if err != nil { + return err + } + + } + + return nil +} + +func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { + + fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer) + + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + }) + return genErr + }) + + return err + +} + +func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { + + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") + if err != nil { + return err + } + + if totalFreeEcSlots < erasure_coding.TotalShardsCount { + return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots) + } + allocatedDataNodes := allEcNodes + if len(allocatedDataNodes) > erasure_coding.TotalShardsCount { + allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount] + } + + // calculate how many shards to allocate for these servers + allocatedEcIds := balancedEcDistribution(allocatedDataNodes) + + // ask the data nodes to copy from the source volume server + copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) + if err != nil { + return err + } + + // unmount the to be deleted shards + err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + if err != nil { + return err + } + + // ask the source volume server to clean up copied ec shards + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) + if err != nil { + return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) + } + + // ask the source volume server to delete the original volume + for _, location := range existingLocations { + fmt.Printf("delete volume %d from %s\n", volumeId, location.Url) + err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) + if err != nil { + return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) + } + } + + return err + +} + +func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { + + fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url) + + // parallelize + shardIdChan := make(chan []uint32, len(targetServers)) + var wg sync.WaitGroup + for i, server := range targetServers { + if len(allocatedEcIds[i]) <= 0 { + continue + } + + wg.Add(1) + go func(server *EcNode, allocatedEcShardIds []uint32) { + defer wg.Done() + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, + allocatedEcShardIds, volumeId, collection, existingLocation.Url) + if copyErr != nil { + err = copyErr + } else { + shardIdChan <- copiedShardIds + server.addEcVolumeShards(volumeId, collection, copiedShardIds) + } + }(server, allocatedEcIds[i]) + } + wg.Wait() + close(shardIdChan) + + if err != nil { + return nil, err + } + + for shardIds := range shardIdChan { + actuallyCopied = append(actuallyCopied, shardIds...) + } + + return +} + +func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { + allocated = make([][]uint32, len(servers)) + allocatedShardIdIndex := uint32(0) + serverIndex := 0 + for allocatedShardIdIndex < erasure_coding.TotalShardsCount { + if servers[serverIndex].freeEcSlot > 0 { + allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex) + allocatedShardIdIndex++ + } + serverIndex++ + if serverIndex >= len(servers) { + serverIndex = 0 + } + } + + return allocated +} + +func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + quietSeconds := int64(quietPeriod / time.Second) + nowUnixSeconds := time.Now().Unix() + + fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) + + vidMap := make(map[uint32]bool) + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.VolumeInfos { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true + } + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go new file mode 100644 index 000000000..df28681fe --- /dev/null +++ b/weed/shell/command_ec_rebuild.go @@ -0,0 +1,271 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + Commands = append(Commands, &commandEcRebuild{}) +} + +type commandEcRebuild struct { +} + +func (c *commandEcRebuild) Name() string { + return "ec.rebuild" +} + +func (c *commandEcRebuild) Help() string { + return `find and rebuild missing ec shards among volume servers + + ec.rebuild [-c EACH_COLLECTION|<collection_name>] [-force] + + Algorithm: + + For each type of volume server (different max volume count limit){ + for each collection { + rebuildEcVolumes() + } + } + + func rebuildEcVolumes(){ + idealWritableVolumes = totalWritableVolumes / numVolumeServers + for { + sort all volume servers ordered by the number of local writable volumes + pick the volume server A with the lowest number of writable volumes x + pick the volume server B with the highest number of writable volumes y + if y > idealWritableVolumes and x +1 <= idealWritableVolumes { + if B has a writable volume id v that A does not have { + move writable volume v from A to B + } + } + } + } + +` +} + +func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection") + applyChanges := fixCommand.Bool("force", false, "apply the changes") + if err = fixCommand.Parse(args); err != nil { + return nil + } + + // collect all ec nodes + allEcNodes, _, err := collectEcNodes(commandEnv, "") + if err != nil { + return err + } + + if *collection == "EACH_COLLECTION" { + collections, err := ListCollectionNames(commandEnv, false, true) + if err != nil { + return err + } + fmt.Printf("rebuildEcVolumes collections %+v\n", len(collections)) + for _, c := range collections { + fmt.Printf("rebuildEcVolumes collection %+v\n", c) + if err = rebuildEcVolumes(commandEnv, allEcNodes, c, writer, *applyChanges); err != nil { + return err + } + } + } else { + if err = rebuildEcVolumes(commandEnv, allEcNodes, *collection, writer, *applyChanges); err != nil { + return err + } + } + + return nil +} + +func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { + + fmt.Printf("rebuildEcVolumes %s\n", collection) + + // collect vid => each shard locations, similar to ecShardMap in topology.go + ecShardMap := make(EcShardMap) + for _, ecNode := range allEcNodes { + ecShardMap.registerEcNode(ecNode, collection) + } + + for vid, locations := range ecShardMap { + shardCount := locations.shardCount() + if shardCount == erasure_coding.TotalShardsCount { + continue + } + if shardCount < erasure_coding.DataShardsCount { + return fmt.Errorf("ec volume %d is unrepairable with %d shards\n", vid, shardCount) + } + + sortEcNodesByFreeslotsDecending(allEcNodes) + + if allEcNodes[0].freeEcSlot < erasure_coding.TotalShardsCount { + return fmt.Errorf("disk space is not enough") + } + + if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { + return err + } + } + + return nil +} + +func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { + + fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) + + // collect shard files to rebuilder local disk + var generatedShardIds []uint32 + copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) + if err != nil { + return err + } + defer func() { + // clean up working files + + // ask the rebuilder to delete the copied shards + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) + if err != nil { + fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) + } + + }() + + if !applyChanges { + return nil + } + + // generate ec shards, and maybe ecx file + generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) + if err != nil { + return err + } + + // mount the generated shards + err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + if err != nil { + return err + } + + rebuilder.addEcVolumeShards(volumeId, collection, generatedShardIds) + + return nil +} + +func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { + + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + }) + if rebultErr == nil { + rebuiltShardIds = resp.RebuiltShardIds + } + return rebultErr + }) + return +} + +func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { + + needEcxFile := true + var localShardBits erasure_coding.ShardBits + for _, ecShardInfo := range rebuilder.info.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + needEcxFile = false + localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } + } + + for shardId, ecNodes := range locations { + + if len(ecNodes) == 0 { + fmt.Fprintf(writer, "missing shard %d.%d\n", volumeId, shardId) + continue + } + + if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) { + localShardIds = append(localShardIds, uint32(shardId)) + fmt.Fprintf(writer, "use existing shard %d.%d\n", volumeId, shardId) + continue + } + + var copyErr error + if applyBalancing { + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + ShardIds: []uint32{uint32(shardId)}, + CopyEcxFile: needEcxFile, + CopyEcjFile: needEcxFile, + CopyVifFile: needEcxFile, + SourceDataNode: ecNodes[0].info.Id, + }) + return copyErr + }) + if copyErr == nil && needEcxFile { + needEcxFile = false + } + } + if copyErr != nil { + fmt.Fprintf(writer, "%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr) + } else { + fmt.Fprintf(writer, "%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id) + copiedShardIds = append(copiedShardIds, uint32(shardId)) + } + + } + + if len(copiedShardIds)+len(localShardIds) >= erasure_coding.DataShardsCount { + return copiedShardIds, localShardIds, nil + } + + return nil, nil, fmt.Errorf("%d shards are not enough to recover volume %d", len(copiedShardIds)+len(localShardIds), volumeId) + +} + +type EcShardMap map[needle.VolumeId]EcShardLocations +type EcShardLocations [][]*EcNode + +func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) { + for _, shardInfo := range ecNode.info.EcShardInfos { + if shardInfo.Collection == collection { + existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] + if !found { + existing = make([][]*EcNode, erasure_coding.TotalShardsCount) + ecShardMap[needle.VolumeId(shardInfo.Id)] = existing + } + for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { + existing[shardId] = append(existing[shardId], ecNode) + } + } + } +} + +func (ecShardLocations EcShardLocations) shardCount() (count int) { + for _, locations := range ecShardLocations { + if len(locations) > 0 { + count++ + } + } + return +} diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go new file mode 100644 index 000000000..4fddcbea5 --- /dev/null +++ b/weed/shell/command_ec_test.go @@ -0,0 +1,139 @@ +package shell + +import ( + "fmt" + "testing" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func TestCommandEcDistribution(t *testing.T) { + + allEcNodes := []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100), + newEcNode("dc1", "rack2", "dn2", 100), + } + + allocated := balancedEcDistribution(allEcNodes) + + fmt.Printf("allocated: %+v", allocated) +} + +func TestCommandEcBalanceSmall(t *testing.T) { + + allEcNodes := []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}), + } + + racks := collectRacks(allEcNodes) + balanceEcVolumes(nil, "c1", allEcNodes, racks, false) +} + +func TestCommandEcBalanceNothingToMove(t *testing.T) { + + allEcNodes := []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + } + + racks := collectRacks(allEcNodes) + balanceEcVolumes(nil, "c1", allEcNodes, racks, false) +} + +func TestCommandEcBalanceAddNewServers(t *testing.T) { + + allEcNodes := []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + newEcNode("dc1", "rack1", "dn3", 100), + newEcNode("dc1", "rack1", "dn4", 100), + } + + racks := collectRacks(allEcNodes) + balanceEcVolumes(nil, "c1", allEcNodes, racks, false) +} + +func TestCommandEcBalanceAddNewRacks(t *testing.T) { + + allEcNodes := []*EcNode{ + newEcNode("dc1", "rack1", "dn1", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}), + newEcNode("dc1", "rack1", "dn2", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}), + newEcNode("dc1", "rack2", "dn3", 100), + newEcNode("dc1", "rack2", "dn4", 100), + } + + racks := collectRacks(allEcNodes) + balanceEcVolumes(nil, "c1", allEcNodes, racks, false) +} + +func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { + + allEcNodes := []*EcNode{ + newEcNode("dc1", "rack1", "dn_shared", 100). + addEcVolumeAndShardsForTest(1, "c1", []uint32{0}). + addEcVolumeAndShardsForTest(2, "c1", []uint32{0}), + + newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{1}), + newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{2}), + newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{3}), + newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{4}), + newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{5}), + newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{6}), + newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{7}), + newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{8}), + newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{9}), + newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{10}), + newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{11}), + newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{12}), + newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{13}), + + newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{1}), + newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{2}), + newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{3}), + newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{4}), + newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{5}), + newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{6}), + newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{7}), + newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{8}), + newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{9}), + newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{10}), + newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{11}), + newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{12}), + newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{13}), + + newEcNode("dc1", "rack1", "dn3", 100), + } + + racks := collectRacks(allEcNodes) + balanceEcVolumes(nil, "c1", allEcNodes, racks, false) + balanceEcRacks(nil, racks, false) +} + +func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode { + return &EcNode{ + info: &master_pb.DataNodeInfo{ + Id: dataNodeId, + }, + dc: dc, + rack: RackId(rack), + freeEcSlot: freeEcSlot, + } +} + +func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode { + return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds) +} diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go new file mode 100644 index 000000000..7177d8ac3 --- /dev/null +++ b/weed/shell/command_fs_cat.go @@ -0,0 +1,59 @@ +package shell + +import ( + "fmt" + "io" + "math" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsCat{}) +} + +type commandFsCat struct { +} + +func (c *commandFsCat) Name() string { + return "fs.cat" +} + +func (c *commandFsCat) Help() string { + return `stream the file content on to the screen + + fs.cat /dir/file_name +` +} + +func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + if commandEnv.isDirectory(path) { + return fmt.Errorf("%s is a directory", path) + } + + dir, name := util.FullPath(path).DirAndName() + + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Name: name, + Directory: dir, + } + respLookupEntry, err := filer_pb.LookupEntry(client, request) + if err != nil { + return err + } + + return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) + + }) + +} diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go new file mode 100644 index 000000000..2cc28f7a2 --- /dev/null +++ b/weed/shell/command_fs_cd.go @@ -0,0 +1,50 @@ +package shell + +import ( + "io" +) + +func init() { + Commands = append(Commands, &commandFsCd{}) +} + +type commandFsCd struct { +} + +func (c *commandFsCd) Name() string { + return "fs.cd" +} + +func (c *commandFsCd) Help() string { + return `change directory to a directory /path/to/dir + + The full path can be too long to type. For example, + fs.ls /some/path/to/file_name + + can be simplified as + + fs.cd /some/path + fs.ls to/file_name +` +} + +func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + if path == "/" { + commandEnv.option.Directory = "/" + return nil + } + + err = commandEnv.checkDirectory(path) + + if err == nil { + commandEnv.option.Directory = path + } + + return err +} diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go new file mode 100644 index 000000000..96551dd5a --- /dev/null +++ b/weed/shell/command_fs_du.go @@ -0,0 +1,84 @@ +package shell + +import ( + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsDu{}) +} + +type commandFsDu struct { +} + +func (c *commandFsDu) Name() string { + return "fs.du" +} + +func (c *commandFsDu) Help() string { + return `show disk usage + + fs.du /dir + fs.du /dir/file_name + fs.du /dir/file_prefix +` +} + +func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + if commandEnv.isDirectory(path) { + path = path + "/" + } + + var blockCount, byteCount uint64 + dir, name := util.FullPath(path).DirAndName() + blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv, dir, name) + + if name == "" && err == nil { + fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir) + } + + return + +} + +func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) { + + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error { + + var fileBlockCount, fileByteCount uint64 + + if entry.IsDirectory { + subDir := fmt.Sprintf("%s/%s", dir, entry.Name) + if dir == "/" { + subDir = "/" + entry.Name + } + numBlock, numByte, err := duTraverseDirectory(writer, filerClient, subDir, "") + if err == nil { + blockCount += numBlock + byteCount += numByte + } + } else { + fileBlockCount = uint64(len(entry.Chunks)) + fileByteCount = filer2.TotalSize(entry.Chunks) + blockCount += uint64(len(entry.Chunks)) + byteCount += filer2.TotalSize(entry.Chunks) + } + + if name != "" && !entry.IsDirectory { + fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name) + } + return nil + }) + return +} diff --git a/weed/shell/command_fs_lock_unlock.go b/weed/shell/command_fs_lock_unlock.go new file mode 100644 index 000000000..8a6e8f71b --- /dev/null +++ b/weed/shell/command_fs_lock_unlock.go @@ -0,0 +1,54 @@ +package shell + +import ( + "io" +) + +func init() { + Commands = append(Commands, &commandUnlock{}) + Commands = append(Commands, &commandLock{}) +} + +// =========== Lock ============== +type commandLock struct { +} + +func (c *commandLock) Name() string { + return "lock" +} + +func (c *commandLock) Help() string { + return `lock in order to exclusively manage the cluster + + This is a blocking operation if there is alread another lock. +` +} + +func (c *commandLock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + commandEnv.locker.RequestLock() + + return nil +} + +// =========== Unlock ============== + +type commandUnlock struct { +} + +func (c *commandUnlock) Name() string { + return "unlock" +} + +func (c *commandUnlock) Help() string { + return `unlock the cluster-wide lock + +` +} + +func (c *commandUnlock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + commandEnv.locker.ReleaseLock() + + return nil +} diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go new file mode 100644 index 000000000..36133992f --- /dev/null +++ b/weed/shell/command_fs_ls.go @@ -0,0 +1,111 @@ +package shell + +import ( + "fmt" + "io" + "os" + "os/user" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsLs{}) +} + +type commandFsLs struct { +} + +func (c *commandFsLs) Name() string { + return "fs.ls" +} + +func (c *commandFsLs) Help() string { + return `list all files under a directory + + fs.ls [-l] [-a] /dir/ + fs.ls [-l] [-a] /dir/file_name + fs.ls [-l] [-a] /dir/file_prefix +` +} + +func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + var isLongFormat, showHidden bool + for _, arg := range args { + if !strings.HasPrefix(arg, "-") { + break + } + for _, t := range arg { + switch t { + case 'a': + showHidden = true + case 'l': + isLongFormat = true + } + } + } + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + if commandEnv.isDirectory(path) { + path = path + "/" + } + + dir, name := util.FullPath(path).DirAndName() + entryCount := 0 + + err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error { + + if !showHidden && strings.HasPrefix(entry.Name, ".") { + return nil + } + + entryCount++ + + if isLongFormat { + fileMode := os.FileMode(entry.Attributes.FileMode) + userName, groupNames := entry.Attributes.UserName, entry.Attributes.GroupName + if userName == "" { + if user, userErr := user.LookupId(strconv.Itoa(int(entry.Attributes.Uid))); userErr == nil { + userName = user.Username + } + } + groupName := "" + if len(groupNames) > 0 { + groupName = groupNames[0] + } + if groupName == "" { + if group, groupErr := user.LookupGroupId(strconv.Itoa(int(entry.Attributes.Gid))); groupErr == nil { + groupName = group.Name + } + } + + if strings.HasSuffix(dir, "/") { + // just for printing + dir = dir[:len(dir)-1] + } + fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n", + fileMode, len(entry.Chunks), + userName, groupName, + filer2.TotalSize(entry.Chunks), dir, entry.Name) + } else { + fmt.Fprintf(writer, "%s\n", entry.Name) + } + + return nil + }) + + if isLongFormat && err == nil { + fmt.Fprintf(writer, "total %d\n", entryCount) + } + + return +} diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go new file mode 100644 index 000000000..0679ec075 --- /dev/null +++ b/weed/shell/command_fs_meta_cat.go @@ -0,0 +1,68 @@ +package shell + +import ( + "fmt" + "io" + + "github.com/golang/protobuf/jsonpb" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsMetaCat{}) +} + +type commandFsMetaCat struct { +} + +func (c *commandFsMetaCat) Name() string { + return "fs.meta.cat" +} + +func (c *commandFsMetaCat) Help() string { + return `print out the meta data content for a file or directory + + fs.meta.cat /dir/ + fs.meta.cat /dir/file_name +` +} + +func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + dir, name := util.FullPath(path).DirAndName() + + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Name: name, + Directory: dir, + } + respLookupEntry, err := filer_pb.LookupEntry(client, request) + if err != nil { + return err + } + + m := jsonpb.Marshaler{ + EmitDefaults: true, + Indent: " ", + } + + text, marshalErr := m.MarshalToString(respLookupEntry.Entry) + if marshalErr != nil { + return fmt.Errorf("marshal meta: %v", marshalErr) + } + + fmt.Fprintf(writer, "%s\n", text) + + return nil + + }) + +} diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go new file mode 100644 index 000000000..69ae9454c --- /dev/null +++ b/weed/shell/command_fs_meta_load.go @@ -0,0 +1,100 @@ +package shell + +import ( + "fmt" + "io" + "os" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsMetaLoad{}) +} + +type commandFsMetaLoad struct { +} + +func (c *commandFsMetaLoad) Name() string { + return "fs.meta.load" +} + +func (c *commandFsMetaLoad) Help() string { + return `load saved filer meta data to restore the directory and file structure + + fs.meta.load <filer_host>-<port>-<time>.meta + +` +} + +func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if len(args) == 0 { + fmt.Fprintf(writer, "missing a metadata file\n") + return nil + } + + fileName := args[len(args)-1] + + dst, err := os.OpenFile(fileName, os.O_RDONLY, 0644) + if err != nil { + return nil + } + defer dst.Close() + + var dirCount, fileCount uint64 + + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + sizeBuf := make([]byte, 4) + + for { + if n, err := dst.Read(sizeBuf); n != 4 { + if err == io.EOF { + return nil + } + return err + } + + size := util.BytesToUint32(sizeBuf) + + data := make([]byte, int(size)) + + if n, err := dst.Read(data); n != len(data) { + return err + } + + fullEntry := &filer_pb.FullEntry{} + if err = proto.Unmarshal(data, fullEntry); err != nil { + return err + } + + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: fullEntry.Dir, + Entry: fullEntry.Entry, + }); err != nil { + return err + } + + fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name)) + + if fullEntry.Entry.IsDirectory { + dirCount++ + } else { + fileCount++ + } + + } + + }) + + if err == nil { + fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) + fmt.Fprintf(writer, "\n%s is loaded.\n", fileName) + } + + return err +} diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go new file mode 100644 index 000000000..4342fa81d --- /dev/null +++ b/weed/shell/command_fs_meta_notify.go @@ -0,0 +1,73 @@ +package shell + +import ( + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/notification" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsMetaNotify{}) +} + +type commandFsMetaNotify struct { +} + +func (c *commandFsMetaNotify) Name() string { + return "fs.meta.notify" +} + +func (c *commandFsMetaNotify) Help() string { + return `recursively send directory and file meta data to notifiction message queue + + fs.meta.notify # send meta data from current directory to notification message queue + + The message queue will use it to trigger replication from this filer. + +` +} + +func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + util.LoadConfiguration("notification", true) + v := util.GetViper() + notification.LoadConfiguration(v, "notification.") + + var dirCount, fileCount uint64 + + err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + + if entry.IsDirectory { + dirCount++ + } else { + fileCount++ + } + + notifyErr := notification.Queue.SendMessage( + string(parentPath.Child(entry.Name)), + &filer_pb.EventNotification{ + NewEntry: entry, + }, + ) + + if notifyErr != nil { + fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr) + } + + }) + + if err == nil { + fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount) + } + + return err + +} diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go new file mode 100644 index 000000000..ed19e3d01 --- /dev/null +++ b/weed/shell/command_fs_meta_save.go @@ -0,0 +1,143 @@ +package shell + +import ( + "flag" + "fmt" + "io" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsMetaSave{}) +} + +type commandFsMetaSave struct { +} + +func (c *commandFsMetaSave) Name() string { + return "fs.meta.save" +} + +func (c *commandFsMetaSave) Help() string { + return `save all directory and file meta data to a local file for metadata backup. + + fs.meta.save / # save from the root + fs.meta.save -v -o t.meta / # save from the root, output to t.meta file. + fs.meta.save /path/to/save # save from the directory /path/to/save + fs.meta.save . # save from current directory + fs.meta.save # save from current directory + + The meta data will be saved into a local <filer_host>-<port>-<time>.meta file. + These meta data can be later loaded by fs.meta.load command, + +` +} + +func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + verbose := fsMetaSaveCommand.Bool("v", false, "print out each processed files") + outputFileName := fsMetaSaveCommand.String("o", "", "output the meta data to this file") + // chunksFileName := fsMetaSaveCommand.String("chunks", "", "output all the chunks to this file") + if err = fsMetaSaveCommand.Parse(args); err != nil { + return nil + } + + path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + if parseErr != nil { + return parseErr + } + + fileName := *outputFileName + if fileName == "" { + t := time.Now() + fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", + commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + } + + dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", fileName, openErr) + } + defer dst.Close() + + err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) { + sizeBuf := make([]byte, 4) + for item := range outputChan { + b := item.([]byte) + util.Uint32toBytes(sizeBuf, uint32(len(b))) + dst.Write(sizeBuf) + dst.Write(b) + } + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + bytes, err := proto.Marshal(entry) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } + + outputChan <- bytes + return nil + }) + + if err == nil { + fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName) + } + + return err + +} + +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error { + + var wg sync.WaitGroup + wg.Add(1) + outputChan := make(chan interface{}, 1024) + go func() { + saveFn(outputChan) + wg.Done() + }() + + var dirCount, fileCount uint64 + + err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + + protoMessage := &filer_pb.FullEntry{ + Dir: string(parentPath), + Entry: entry, + } + + if err := genFn(protoMessage, outputChan); err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } + + if entry.IsDirectory { + atomic.AddUint64(&dirCount, 1) + } else { + atomic.AddUint64(&fileCount, 1) + } + + if verbose { + println(parentPath.Child(entry.Name)) + } + + }) + + close(outputChan) + + wg.Wait() + + if err == nil && writer != nil { + fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) + } + return err +} diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go new file mode 100644 index 000000000..0a7eed02d --- /dev/null +++ b/weed/shell/command_fs_mv.go @@ -0,0 +1,90 @@ +package shell + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsMv{}) +} + +type commandFsMv struct { +} + +func (c *commandFsMv) Name() string { + return "fs.mv" +} + +func (c *commandFsMv) Help() string { + return `move or rename a file or a folder + + fs.mv <source entry> <destination entry> + + fs.mv /dir/file_name /dir2/filename2 + fs.mv /dir/file_name /dir2 + + fs.mv /dir/dir2 /dir3/dir4/ + fs.mv /dir/dir2 /dir3/new_dir + +` +} + +func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + sourcePath, err := commandEnv.parseUrl(args[0]) + if err != nil { + return err + } + + destinationPath, err := commandEnv.parseUrl(args[1]) + if err != nil { + return err + } + + sourceDir, sourceName := util.FullPath(sourcePath).DirAndName() + + destinationDir, destinationName := util.FullPath(destinationPath).DirAndName() + + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + // collect destination entry info + destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ + Name: destinationDir, + Directory: destinationName, + } + respDestinationLookupEntry, err := filer_pb.LookupEntry(client, destinationRequest) + + var targetDir, targetName string + + // moving a file or folder + if err == nil && respDestinationLookupEntry.Entry.IsDirectory { + // to a directory + targetDir = util.Join(destinationDir, destinationName) + targetName = sourceName + } else { + // to a file or folder + targetDir = destinationDir + targetName = destinationName + } + + request := &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: sourceDir, + OldName: sourceName, + NewDirectory: targetDir, + NewName: targetName, + } + + _, err = client.AtomicRenameEntry(context.Background(), request) + + fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, util.NewFullPath(targetDir, targetName)) + + return err + + }) + +} diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go new file mode 100644 index 000000000..d7d9819c8 --- /dev/null +++ b/weed/shell/command_fs_pwd.go @@ -0,0 +1,28 @@ +package shell + +import ( + "fmt" + "io" +) + +func init() { + Commands = append(Commands, &commandFsPwd{}) +} + +type commandFsPwd struct { +} + +func (c *commandFsPwd) Name() string { + return "fs.pwd" +} + +func (c *commandFsPwd) Help() string { + return `print out current directory` +} + +func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory) + + return nil +} diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go new file mode 100644 index 000000000..a8c5b2018 --- /dev/null +++ b/weed/shell/command_fs_tree.go @@ -0,0 +1,113 @@ +package shell + +import ( + "fmt" + "io" + "strings" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandFsTree{}) +} + +type commandFsTree struct { +} + +func (c *commandFsTree) Name() string { + return "fs.tree" +} + +func (c *commandFsTree) Help() string { + return `recursively list all files under a directory + + fs.tree /some/dir + +` +} + +func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + path, err := commandEnv.parseUrl(findInputDirectory(args)) + if err != nil { + return err + } + + dir, name := util.FullPath(path).DirAndName() + + dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv, util.FullPath(dir), name, newPrefix(), -1) + + if terr == nil { + fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) + } + + return terr + +} + +func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir util.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { + + prefix.addMarker(level) + + err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) error { + if level < 0 && name != "" { + if entry.Name != name { + return nil + } + } + + fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name) + + if entry.IsDirectory { + directoryCount++ + subDir := dir.Child(entry.Name) + dirCount, fCount, terr := treeTraverseDirectory(writer, filerClient, subDir, "", prefix, level+1) + directoryCount += dirCount + fileCount += fCount + err = terr + } else { + fileCount++ + } + return nil + }) + return +} + +type Prefix struct { + markers map[int]bool +} + +func newPrefix() *Prefix { + return &Prefix{ + markers: make(map[int]bool), + } +} +func (p *Prefix) addMarker(marker int) { + p.markers[marker] = true +} +func (p *Prefix) removeMarker(marker int) { + delete(p.markers, marker) +} +func (p *Prefix) getPrefix(level int, isLastChild bool) string { + var sb strings.Builder + if level < 0 { + return "" + } + for i := 0; i < level; i++ { + if _, ok := p.markers[i]; ok { + sb.WriteString("│") + } else { + sb.WriteString(" ") + } + sb.WriteString(" ") + } + if isLastChild { + sb.WriteString("└──") + p.removeMarker(level) + } else { + sb.WriteString("├──") + } + return sb.String() +} diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go new file mode 100644 index 000000000..69e3c7fd9 --- /dev/null +++ b/weed/shell/command_volume_balance.go @@ -0,0 +1,257 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "sort" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeBalance{}) +} + +type commandVolumeBalance struct { +} + +func (c *commandVolumeBalance) Name() string { + return "volume.balance" +} + +func (c *commandVolumeBalance) Help() string { + return `balance all volumes among volume servers + + volume.balance [-collection ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>] + + Algorithm: + + For each type of volume server (different max volume count limit){ + for each collection { + balanceWritableVolumes() + balanceReadOnlyVolumes() + } + } + + func balanceWritableVolumes(){ + idealWritableVolumes = totalWritableVolumes / numVolumeServers + for hasMovedOneVolume { + sort all volume servers ordered by the number of local writable volumes + pick the volume server A with the lowest number of writable volumes x + pick the volume server B with the highest number of writable volumes y + if y > idealWritableVolumes and x +1 <= idealWritableVolumes { + if B has a writable volume id v that A does not have { + move writable volume v from A to B + } + } + } + } + func balanceReadOnlyVolumes(){ + //similar to balanceWritableVolumes + } + +` +} + +func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection") + dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter") + applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan.") + if err = balanceCommand.Parse(args); err != nil { + return nil + } + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc) + + for maxVolumeCount, volumeServers := range typeToNodes { + if len(volumeServers) < 2 { + fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount) + continue + } + if *collection == "EACH_COLLECTION" { + collections, err := ListCollectionNames(commandEnv, true, false) + if err != nil { + return err + } + for _, c := range collections { + if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { + return err + } + } + } else if *collection == "ALL_COLLECTIONS" { + if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { + return err + } + } else { + if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + return err + } + } + + } + return nil +} + +func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { + + // balance writable volumes + for _, n := range nodes { + n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { + if collection != "ALL_COLLECTIONS" { + if v.Collection != collection { + return false + } + } + return !v.ReadOnly && v.Size < volumeSizeLimit + }) + } + if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil { + return err + } + + // balance readable volumes + for _, n := range nodes { + n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { + if collection != "ALL_COLLECTIONS" { + if v.Collection != collection { + return false + } + } + return v.ReadOnly || v.Size >= volumeSizeLimit + }) + } + if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { + return err + } + + return nil +} + +func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) { + typeToNodes = make(map[uint64][]*Node) + for _, dc := range t.DataCenterInfos { + if selectedDataCenter != "" && dc.Id != selectedDataCenter { + continue + } + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{ + info: dn, + dc: dc.Id, + rack: r.Id, + }) + } + } + } + return +} + +type Node struct { + info *master_pb.DataNodeInfo + selectedVolumes map[uint32]*master_pb.VolumeInformationMessage + dc string + rack string +} + +func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { + sort.Slice(volumes, func(i, j int) bool { + return volumes[i].Size < volumes[j].Size + }) +} + +func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { + sort.Slice(volumes, func(i, j int) bool { + return volumes[i].Id < volumes[j].Id + }) +} + +func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) error { + selectedVolumeCount := 0 + for _, dn := range nodes { + selectedVolumeCount += len(dn.selectedVolumes) + } + + idealSelectedVolumes := ceilDivide(selectedVolumeCount, len(nodes)) + + hasMove := true + + for hasMove { + hasMove = false + sort.Slice(nodes, func(i, j int) bool { + // TODO sort by free volume slots??? + return len(nodes[i].selectedVolumes) < len(nodes[j].selectedVolumes) + }) + emptyNode, fullNode := nodes[0], nodes[len(nodes)-1] + if len(fullNode.selectedVolumes) > idealSelectedVolumes && len(emptyNode.selectedVolumes)+1 <= idealSelectedVolumes { + + // sort the volumes to move + var candidateVolumes []*master_pb.VolumeInformationMessage + for _, v := range fullNode.selectedVolumes { + candidateVolumes = append(candidateVolumes, v) + } + sortCandidatesFn(candidateVolumes) + + for _, v := range candidateVolumes { + if v.ReplicaPlacement > 0 { + if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack { + // TODO this logic is too simple, but should work most of the time + // Need a correct algorithm to handle all different cases + continue + } + } + if _, found := emptyNode.selectedVolumes[v.Id]; !found { + if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil { + delete(fullNode.selectedVolumes, v.Id) + emptyNode.selectedVolumes[v.Id] = v + hasMove = true + break + } else { + return err + } + } + } + } + } + return nil +} + +func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, fullNode *Node, emptyNode *Node, applyBalancing bool) error { + collectionPrefix := v.Collection + "_" + if v.Collection == "" { + collectionPrefix = "" + } + fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) + if applyBalancing { + return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) + } + return nil +} + +func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { + node.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) + for _, v := range node.info.VolumeInfos { + if fn(v) { + node.selectedVolumes[v.Id] = v + } + } +} diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go new file mode 100644 index 000000000..ff976c345 --- /dev/null +++ b/weed/shell/command_volume_configure_replication.go @@ -0,0 +1,108 @@ +package shell + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +func init() { + Commands = append(Commands, &commandVolumeConfigureReplication{}) +} + +type commandVolumeConfigureReplication struct { +} + +func (c *commandVolumeConfigureReplication) Name() string { + return "volume.configure.replication" +} + +func (c *commandVolumeConfigureReplication) Help() string { + return `change volume replication value + + This command changes a volume replication value. It should be followed by volume.fix.replication. + +` +} + +func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id") + replicationString := configureReplicationCommand.String("replication", "", "the intended replication value") + if err = configureReplicationCommand.Parse(args); err != nil { + return nil + } + + if *replicationString == "" { + return fmt.Errorf("empty replication value") + } + + replicaPlacement, err := super_block.NewReplicaPlacementFromString(*replicationString) + if err != nil { + return fmt.Errorf("replication format: %v", err) + } + replicaPlacementInt32 := uint32(replicaPlacement.Byte()) + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + vid := needle.VolumeId(*volumeIdInt) + + // find all data nodes with volumes that needs replication change + var allLocations []location + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + loc := newLocation(dc, string(rack), dn) + for _, v := range dn.VolumeInfos { + if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { + allLocations = append(allLocations, loc) + continue + } + } + }) + + if len(allLocations) == 0 { + return fmt.Errorf("no volume needs change") + } + + for _, dst := range allLocations { + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ + VolumeId: uint32(vid), + Replication: replicaPlacement.String(), + }) + if configureErr != nil { + return configureErr + } + if resp.Error != "" { + return errors.New(resp.Error) + } + return nil + }) + + if err != nil { + return err + } + + } + + return nil +} diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go new file mode 100644 index 000000000..cdd10863f --- /dev/null +++ b/weed/shell/command_volume_copy.go @@ -0,0 +1,55 @@ +package shell + +import ( + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeCopy{}) +} + +type commandVolumeCopy struct { +} + +func (c *commandVolumeCopy) Name() string { + return "volume.copy" +} + +func (c *commandVolumeCopy) Help() string { + return `copy a volume from one volume server to another volume server + + volume.copy <source volume server host:port> <target volume server host:port> <volume id> + + This command copies a volume from one volume server to another volume server. + Usually you will want to unmount the volume first before copying. + +` +} + +func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + if len(args) != 3 { + fmt.Fprintf(writer, "received args: %+v\n", args) + return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>") + } + sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2] + + volumeId, err := needle.NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) + } + + if sourceVolumeServer == targetVolumeServer { + return fmt.Errorf("source and target volume servers are the same!") + } + + _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) + return +} diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go new file mode 100644 index 000000000..c5cc9e277 --- /dev/null +++ b/weed/shell/command_volume_delete.go @@ -0,0 +1,50 @@ +package shell + +import ( + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeDelete{}) +} + +type commandVolumeDelete struct { +} + +func (c *commandVolumeDelete) Name() string { + return "volume.delete" +} + +func (c *commandVolumeDelete) Help() string { + return `delete a live volume from one volume server + + volume.delete <volume server host:port> <volume id> + + This command deletes a volume from one volume server. + +` +} + +func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + if len(args) != 2 { + fmt.Fprintf(writer, "received args: %+v\n", args) + return fmt.Errorf("need 2 args of <volume server host:port> <volume id>") + } + sourceVolumeServer, volumeIdString := args[0], args[1] + + volumeId, err := needle.NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) + } + + return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) + +} diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go new file mode 100644 index 000000000..6b5e4e735 --- /dev/null +++ b/weed/shell/command_volume_fix_replication.go @@ -0,0 +1,307 @@ +package shell + +import ( + "context" + "fmt" + "io" + "math/rand" + "sort" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +func init() { + Commands = append(Commands, &commandVolumeFixReplication{}) +} + +type commandVolumeFixReplication struct { +} + +func (c *commandVolumeFixReplication) Name() string { + return "volume.fix.replication" +} + +func (c *commandVolumeFixReplication) Help() string { + return `add replicas to volumes that are missing replicas + + This command file all under-replicated volumes, and find volume servers with free slots. + If the free slots satisfy the replication requirement, the volume content is copied over and mounted. + + volume.fix.replication -n # do not take action + volume.fix.replication # actually copying the volume files and mount the volume + + Note: + * each time this will only add back one replica for one volume id. If there are multiple replicas + are missing, e.g. multiple volume servers are new, you may need to run this multiple times. + * do not run this too quick within seconds, since the new volume replica may take a few seconds + to register itself to the master. + +` +} + +func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + takeAction := true + if len(args) > 0 && args[0] == "-n" { + takeAction = false + } + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + // find all volumes that needs replication + // collect all data nodes + replicatedVolumeLocations := make(map[uint32][]location) + replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) + var allLocations []location + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + loc := newLocation(dc, string(rack), dn) + for _, v := range dn.VolumeInfos { + if v.ReplicaPlacement > 0 { + replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) + replicatedVolumeInfo[v.Id] = v + } + } + allLocations = append(allLocations, loc) + }) + + // find all under replicated volumes + underReplicatedVolumeLocations := make(map[uint32][]location) + for vid, locations := range replicatedVolumeLocations { + volumeInfo := replicatedVolumeInfo[vid] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(locations) { + underReplicatedVolumeLocations[vid] = locations + } + } + + if len(underReplicatedVolumeLocations) == 0 { + return fmt.Errorf("no under replicated volumes") + } + + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") + } + + // find the most under populated data nodes + keepDataNodesSorted(allLocations) + + for vid, locations := range underReplicatedVolumeLocations { + volumeInfo := replicatedVolumeInfo[vid] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + foundNewLocation := false + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { + // ask the volume server to replicate the volume + sourceNodes := underReplicatedVolumeLocations[vid] + sourceNode := sourceNodes[rand.Intn(len(sourceNodes))] + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) + + if !takeAction { + break + } + + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: volumeInfo.Id, + SourceDataNode: sourceNode.dataNode.Id, + }) + if replicateErr != nil { + return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) + } + return nil + }) + + if err != nil { + return err + } + + // adjust free volume count + dst.dataNode.FreeVolumeCount-- + keepDataNodesSorted(allLocations) + break + } + } + if !foundNewLocation { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) + } + + } + + return nil +} + +func keepDataNodesSorted(dataNodes []location) { + sort.Slice(dataNodes, func(i, j int) bool { + return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount + }) +} + +/* + if on an existing data node { + return false + } + if different from existing dcs { + if lack on different dcs { + return true + }else{ + return false + } + } + if not on primary dc { + return false + } + if different from existing racks { + if lack on different racks { + return true + }else{ + return false + } + } + if not on primary rack { + return false + } + if lacks on same rack { + return true + } else { + return false + } +*/ +func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { + + existingDataNodes := make(map[string]int) + for _, loc := range existingLocations { + existingDataNodes[loc.String()] += 1 + } + sameDataNodeCount := existingDataNodes[possibleLocation.String()] + // avoid duplicated volume on the same data node + if sameDataNodeCount > 0 { + return false + } + + existingDataCenters := make(map[string]int) + for _, loc := range existingLocations { + existingDataCenters[loc.DataCenter()] += 1 + } + primaryDataCenters, _ := findTopKeys(existingDataCenters) + + // ensure data center count is within limit + if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found { + // different from existing dcs + if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 { + // lack on different dcs + return true + } else { + // adding this would go over the different dcs limit + return false + } + } + // now this is same as one of the existing data center + if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) { + // not on one of the primary dcs + return false + } + + // now this is one of the primary dcs + existingRacks := make(map[string]int) + for _, loc := range existingLocations { + if loc.DataCenter() != possibleLocation.DataCenter() { + continue + } + existingRacks[loc.Rack()] += 1 + } + primaryRacks, _ := findTopKeys(existingRacks) + sameRackCount := existingRacks[possibleLocation.Rack()] + + // ensure rack count is within limit + if _, found := existingRacks[possibleLocation.Rack()]; !found { + // different from existing racks + if len(existingRacks) < replicaPlacement.DiffRackCount+1 { + // lack on different racks + return true + } else { + // adding this would go over the different racks limit + return false + } + } + // now this is same as one of the existing racks + if !isAmong(possibleLocation.Rack(), primaryRacks) { + // not on the primary rack + return false + } + + // now this is on the primary rack + + // different from existing data nodes + if sameRackCount < replicaPlacement.SameRackCount+1 { + // lack on same rack + return true + } else { + // adding this would go over the same data node limit + return false + } + +} + +func findTopKeys(m map[string]int) (topKeys []string, max int) { + for k, c := range m { + if max < c { + topKeys = topKeys[:0] + topKeys = append(topKeys, k) + max = c + } else if max == c { + topKeys = append(topKeys, k) + } + } + return +} + +func isAmong(key string, keys []string) bool { + for _, k := range keys { + if k == key { + return true + } + } + return false +} + +type location struct { + dc string + rack string + dataNode *master_pb.DataNodeInfo +} + +func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location { + return location{ + dc: dc, + rack: rack, + dataNode: dataNode, + } +} + +func (l location) String() string { + return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id) +} + +func (l location) Rack() string { + return fmt.Sprintf("%s %s", l.dc, l.rack) +} + +func (l location) DataCenter() string { + return l.dc +} diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go new file mode 100644 index 000000000..4cfbd96aa --- /dev/null +++ b/weed/shell/command_volume_fix_replication_test.go @@ -0,0 +1,207 @@ +package shell + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +type testcase struct { + name string + replication string + existingLocations []location + possibleLocation location + expected bool +} + +func TestSatisfyReplicaPlacementComplicated(t *testing.T) { + + var tests = []testcase{ + { + name: "test 100 negative", + replication: "100", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + expected: false, + }, + { + name: "test 100 positive", + replication: "100", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + possibleLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + expected: true, + }, + { + name: "test 022 positive", + replication: "022", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}}, + expected: true, + }, + { + name: "test 022 negative", + replication: "022", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}}, + expected: false, + }, + { + name: "test 210 moved from 200 positive", + replication: "210", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}}, + expected: true, + }, + { + name: "test 210 moved from 200 negative extra dc", + replication: "210", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + possibleLocation: location{"dc4", "r4", &master_pb.DataNodeInfo{Id: "dn4"}}, + expected: false, + }, + { + name: "test 210 moved from 200 negative extra data node", + replication: "210", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}}, + expected: false, + }, + } + + runTests(tests, t) + +} + +func TestSatisfyReplicaPlacement01x(t *testing.T) { + + var tests = []testcase{ + { + name: "test 011 same existing rack", + replication: "011", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + { + name: "test 011 negative", + replication: "011", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: false, + }, + { + name: "test 011 different existing racks", + replication: "011", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + { + name: "test 011 different existing racks negative", + replication: "011", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + possibleLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: false, + }, + } + + runTests(tests, t) + +} + +func TestSatisfyReplicaPlacement00x(t *testing.T) { + + var tests = []testcase{ + { + name: "test 001", + replication: "001", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + expected: true, + }, + { + name: "test 002 positive", + replication: "002", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + expected: true, + }, + { + name: "test 002 negative, repeat the same node", + replication: "002", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + expected: false, + }, + { + name: "test 002 negative, enough node already", + replication: "002", + existingLocations: []location{ + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}}, + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}}, + {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}}, + }, + possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}}, + expected: false, + }, + } + + runTests(tests, t) + +} + +func runTests(tests []testcase, t *testing.T) { + for _, tt := range tests { + replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication) + println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name) + if satisfyReplicaPlacement(replicaPlacement, tt.existingLocations, tt.possibleLocation) != tt.expected { + t.Errorf("%s: expect %v add %v to %s %+v", + tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.existingLocations) + } + } +} diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go new file mode 100644 index 000000000..69a1a63b4 --- /dev/null +++ b/weed/shell/command_volume_fsck.go @@ -0,0 +1,361 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "sync" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func init() { + Commands = append(Commands, &commandVolumeFsck{}) +} + +type commandVolumeFsck struct { + env *CommandEnv +} + +func (c *commandVolumeFsck) Name() string { + return "volume.fsck" +} + +func (c *commandVolumeFsck) Help() string { + return `check all volumes to find entries not used by the filer + + Important assumption!!! + the system is all used by one filer. + + This command works this way: + 1. collect all file ids from all volumes, as set A + 2. collect all file ids from the filer, as set B + 3. find out the set A subtract B + +` +} + +func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + verbose := fsckCommand.Bool("v", false, "verbose mode") + applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer") + if err = fsckCommand.Parse(args); err != nil { + return nil + } + + c.env = commandEnv + + // create a temp folder + tempFolder, err := ioutil.TempDir("", "sw_fsck") + if err != nil { + return fmt.Errorf("failed to create temp folder: %v", err) + } + if *verbose { + fmt.Fprintf(writer, "working directory: %s\n", tempFolder) + } + defer os.RemoveAll(tempFolder) + + // collect all volume id locations + volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer) + if err != nil { + return fmt.Errorf("failed to collect all volume locations: %v", err) + } + + // collect each volume file ids + for volumeId, vinfo := range volumeIdToVInfo { + err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer) + if err != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + } + } + + // collect all filer file ids + if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { + return fmt.Errorf("failed to collect file ids from filer: %v", err) + } + + // volume file ids substract filer file ids + var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 + for volumeId, vinfo := range volumeIdToVInfo { + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose) + if checkErr != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) + } + totalInUseCount += inUseCount + totalOrphanChunkCount += uint64(len(orphanFileIds)) + totalOrphanDataSize += orphanDataSize + + if *applyPurging && len(orphanFileIds) > 0 { + if vinfo.isEcVolume { + fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n") + } + if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { + return fmt.Errorf("purge for volume %d: %v\n", volumeId, err) + } + } + } + + if totalOrphanChunkCount == 0 { + fmt.Fprintf(writer, "no orphan data\n") + return nil + } + + if !*applyPurging { + pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount)) + fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", + totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize) + + fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n") + } + + return nil +} + +func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error { + + if verbose { + fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) + } + + return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + ext := ".idx" + if vinfo.isEcVolume { + ext = ".ecx" + } + + copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ + VolumeId: volumeId, + Ext: ext, + CompactionRevision: math.MaxUint32, + StopOffset: math.MaxInt64, + Collection: vinfo.collection, + IsEcVolume: vinfo.isEcVolume, + IgnoreSourceFileNotFound: false, + }) + if err != nil { + return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err) + } + + err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId)) + if err != nil { + return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err) + } + + return nil + + }) + +} + +func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error { + + if verbose { + fmt.Fprintf(writer, "collecting file ids from filer ...\n") + } + + files := make(map[uint32]*os.File) + for vid := range volumeIdToServer { + dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr) + } + files[vid] = dst + } + defer func() { + for _, f := range files { + f.Close() + } + }() + + type Item struct { + vid uint32 + fileKey uint64 + } + return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) { + buffer := make([]byte, 8) + for item := range outputChan { + i := item.(*Item) + util.Uint64toBytes(buffer, i.fileKey) + files[i.vid].Write(buffer) + } + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + for _, chunk := range entry.Entry.Chunks { + outputChan <- &Item{ + vid: chunk.Fid.VolumeId, + fileKey: chunk.Fid.FileKey, + } + } + return nil + }) +} + +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { + + db := needle_map.NewMemDb() + defer db.Close() + + if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil { + return + } + + filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId)) + if err != nil { + return + } + + dataLen := len(filerFileIdsData) + if dataLen%8 != 0 { + return 0, nil, 0, fmt.Errorf("filer data is corrupted") + } + + for i := 0; i < len(filerFileIdsData); i += 8 { + fileKey := util.BytesToUint64(filerFileIdsData[i : i+8]) + db.Delete(types.NeedleId(fileKey)) + inUseCount++ + } + + var orphanFileCount uint64 + db.AscendingVisit(func(n needle_map.NeedleValue) error { + // fmt.Printf("%d,%x\n", volumeId, n.Key) + orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String())) + orphanFileCount++ + orphanDataSize += uint64(n.Size) + return nil + }) + + if orphanFileCount > 0 { + pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount)) + fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", + volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize) + } + + return + +} + +type VInfo struct { + server string + collection string + isEcVolume bool +} + +func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) { + + if verbose { + fmt.Fprintf(writer, "collecting volume id and locations from master ...\n") + } + + volumeIdToServer = make(map[uint32]VInfo) + var resp *master_pb.VolumeListResponse + err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { + for _, vi := range t.VolumeInfos { + volumeIdToServer[vi.Id] = VInfo{ + server: t.Id, + collection: vi.Collection, + isEcVolume: false, + } + } + for _, ecShardInfo := range t.EcShardInfos { + volumeIdToServer[ecShardInfo.Id] = VInfo{ + server: t.Id, + collection: ecShardInfo.Collection, + isEcVolume: true, + } + } + }) + + if verbose { + fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer)) + } + return +} + +func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) { + fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId) + locations, found := c.env.MasterClient.GetLocations(volumeId) + if !found { + return fmt.Errorf("failed to find volume %d locations", volumeId) + } + + resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations)) + var wg sync.WaitGroup + for _, location := range locations { + wg.Add(1) + go func(server string, fidList []string) { + defer wg.Done() + + if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil { + err = deleteErr + } else if deleteResults != nil { + resultChan <- deleteResults + } + + }(location.Url, fileIds) + } + wg.Wait() + close(resultChan) + + for results := range resultChan { + for _, result := range results { + if result.Error != "" { + fmt.Fprintf(writer, "purge error: %s\n", result.Error) + } + } + } + + return +} + +func getVolumeFileIdFile(tempFolder string, vid uint32) string { + return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid)) +} + +func getFilerFileIdFile(tempFolder string, vid uint32) string { + return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid)) +} + +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error { + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + dst, err := os.OpenFile(fileName, flags, 0644) + if err != nil { + return nil + } + defer dst.Close() + + for { + resp, receiveErr := client.Recv() + if receiveErr == io.EOF { + break + } + if receiveErr != nil { + return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + } + dst.Write(resp.FileContent) + } + return nil +} diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go new file mode 100644 index 000000000..c5a9388fa --- /dev/null +++ b/weed/shell/command_volume_list.go @@ -0,0 +1,133 @@ +package shell + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + + "io" + "sort" +) + +func init() { + Commands = append(Commands, &commandVolumeList{}) +} + +type commandVolumeList struct { +} + +func (c *commandVolumeList) Name() string { + return "volume.list" +} + +func (c *commandVolumeList) Help() string { + return `list all volumes + + This command list all volumes as a tree of dataCenter > rack > dataNode > volume. + +` +} + +func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + writeTopologyInfo(writer, resp.TopologyInfo, resp.VolumeSizeLimitMb) + return nil +} + +func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) statistics { + fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d remote:%d volumeSizeLimit:%d MB\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount, volumeSizeLimitMb) + sort.Slice(t.DataCenterInfos, func(i, j int) bool { + return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id + }) + var s statistics + for _, dc := range t.DataCenterInfos { + s = s.plus(writeDataCenterInfo(writer, dc)) + } + fmt.Fprintf(writer, "%+v \n", s) + return s +} +func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics { + fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + var s statistics + sort.Slice(t.RackInfos, func(i, j int) bool { + return t.RackInfos[i].Id < t.RackInfos[j].Id + }) + for _, r := range t.RackInfos { + s = s.plus(writeRackInfo(writer, r)) + } + fmt.Fprintf(writer, " DataCenter %s %+v \n", t.Id, s) + return s +} +func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics { + fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + var s statistics + sort.Slice(t.DataNodeInfos, func(i, j int) bool { + return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id + }) + for _, dn := range t.DataNodeInfos { + s = s.plus(writeDataNodeInfo(writer, dn)) + } + fmt.Fprintf(writer, " Rack %s %+v \n", t.Id, s) + return s +} +func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { + fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + var s statistics + sort.Slice(t.VolumeInfos, func(i, j int) bool { + return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id + }) + for _, vi := range t.VolumeInfos { + s = s.plus(writeVolumeInformationMessage(writer, vi)) + } + for _, ecShardInfo := range t.EcShardInfos { + fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) + } + fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) + return s +} +func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics { + fmt.Fprintf(writer, " volume %+v \n", t) + return newStatistics(t) +} + +type statistics struct { + Size uint64 + FileCount uint64 + DeletedFileCount uint64 + DeletedBytes uint64 +} + +func newStatistics(t *master_pb.VolumeInformationMessage) statistics { + return statistics{ + Size: t.Size, + FileCount: t.FileCount, + DeletedFileCount: t.DeleteCount, + DeletedBytes: t.DeletedByteCount, + } +} + +func (s statistics) plus(t statistics) statistics { + return statistics{ + Size: s.Size + t.Size, + FileCount: s.FileCount + t.FileCount, + DeletedFileCount: s.DeletedFileCount + t.DeletedFileCount, + DeletedBytes: s.DeletedBytes + t.DeletedBytes, + } +} + +func (s statistics) String() string { + if s.DeletedFileCount > 0 { + return fmt.Sprintf("total size:%d file_count:%d deleted_file:%d deleted_bytes:%d", s.Size, s.FileCount, s.DeletedFileCount, s.DeletedBytes) + } + return fmt.Sprintf("total size:%d file_count:%d", s.Size, s.FileCount) +} diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go new file mode 100644 index 000000000..ded7b7e66 --- /dev/null +++ b/weed/shell/command_volume_mount.go @@ -0,0 +1,63 @@ +package shell + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + Commands = append(Commands, &commandVolumeMount{}) +} + +type commandVolumeMount struct { +} + +func (c *commandVolumeMount) Name() string { + return "volume.mount" +} + +func (c *commandVolumeMount) Help() string { + return `mount a volume from one volume server + + volume.mount <volume server host:port> <volume id> + + This command mounts a volume from one volume server. + +` +} + +func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + if len(args) != 2 { + fmt.Fprintf(writer, "received args: %+v\n", args) + return fmt.Errorf("need 2 args of <volume server host:port> <volume id>") + } + sourceVolumeServer, volumeIdString := args[0], args[1] + + volumeId, err := needle.NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) + } + + return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) + +} + +func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ + VolumeId: uint32(volumeId), + }) + return mountErr + }) +} diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go new file mode 100644 index 000000000..392b947e7 --- /dev/null +++ b/weed/shell/command_volume_move.go @@ -0,0 +1,129 @@ +package shell + +import ( + "context" + "fmt" + "io" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + Commands = append(Commands, &commandVolumeMove{}) +} + +type commandVolumeMove struct { +} + +func (c *commandVolumeMove) Name() string { + return "volume.move" +} + +func (c *commandVolumeMove) Help() string { + return `move a live volume from one volume server to another volume server + + volume.move <source volume server host:port> <target volume server host:port> <volume id> + + This command move a live volume from one volume server to another volume server. Here are the steps: + + 1. This command asks the target volume server to copy the source volume from source volume server, remember the last entry's timestamp. + 2. This command asks the target volume server to mount the new volume + Now the master will mark this volume id as readonly. + 3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain the requests. + 4. This command asks the source volume server to unmount the source volume + Now the master will mark this volume id as writable. + 5. This command asks the source volume server to delete the source volume + +` +} + +func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + if len(args) != 3 { + fmt.Fprintf(writer, "received args: %+v\n", args) + return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>") + } + sourceVolumeServer, targetVolumeServer, volumeIdString := args[0], args[1], args[2] + + volumeId, err := needle.NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) + } + + if sourceVolumeServer == targetVolumeServer { + return fmt.Errorf("source and target volume servers are the same!") + } + + return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second) +} + +// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. +func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) { + + log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) + lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) + if err != nil { + return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) + } + + log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) + if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil { + return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) + } + + log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer) + if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil { + return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err) + } + + log.Printf("moved volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) + return nil +} + +func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { + + err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + VolumeId: uint32(volumeId), + SourceDataNode: sourceVolumeServer, + }) + if replicateErr == nil { + lastAppendAtNs = resp.LastAppendAtNs + } + return replicateErr + }) + + return +} + +func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { + + return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ + VolumeId: uint32(volumeId), + SinceNs: lastAppendAtNs, + IdleTimeoutSeconds: uint32(idleTimeout.Seconds()), + SourceVolumeServer: sourceVolumeServer, + }) + return replicateErr + }) + +} + +func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ + VolumeId: uint32(volumeId), + }) + return deleteErr + }) +} diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go new file mode 100644 index 000000000..d31c8c031 --- /dev/null +++ b/weed/shell/command_volume_tier_download.go @@ -0,0 +1,170 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeTierDownload{}) +} + +type commandVolumeTierDownload struct { +} + +func (c *commandVolumeTierDownload) Name() string { + return "volume.tier.download" +} + +func (c *commandVolumeTierDownload) Help() string { + return `download the dat file of a volume from a remote tier + + volume.tier.download [-collection=""] + volume.tier.download [-collection=""] -volumeId=<volume_id> + + e.g.: + volume.tier.download -volumeId=7 + volume.tier.download -volumeId=7 + + This command will download the dat file of a volume from a remote tier to a volume server in local cluster. + +` +} + +func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := tierCommand.Int("volumeId", 0, "the volume id") + collection := tierCommand.String("collection", "", "the collection name") + if err = tierCommand.Parse(args); err != nil { + return nil + } + + vid := needle.VolumeId(*volumeId) + + // collect topology information + topologyInfo, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } + + // volumeId is provided + if vid != 0 { + return doVolumeTierDownload(commandEnv, writer, *collection, vid) + } + + // apply to all volumes in the collection + // reusing collectVolumeIdsForEcEncode for now + volumeIds := collectRemoteVolumes(topologyInfo, *collection) + if err != nil { + return err + } + fmt.Printf("tier download volumes: %v\n", volumeIds) + for _, vid := range volumeIds { + if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { + + vidMap := make(map[uint32]bool) + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.VolumeInfos { + if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { + vidMap[v.Id] = true + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} + +func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) { + // find volume location + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { + return fmt.Errorf("volume %d not found", vid) + } + + // TODO parallelize this + for _, loc := range locations { + // copy the .dat file from remote tier to local + err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url) + if err != nil { + return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err) + } + } + + return nil +} + +func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { + + err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + }) + + var lastProcessed int64 + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0 + + fmt.Fprintf(writer, "downloaded %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed) + + lastProcessed = resp.Processed + } + if downloadErr != nil { + return downloadErr + } + + _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ + VolumeId: uint32(volumeId), + }) + if unmountErr != nil { + return unmountErr + } + + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ + VolumeId: uint32(volumeId), + }) + if mountErr != nil { + return mountErr + } + + return nil + }) + + return err + +} diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go new file mode 100644 index 000000000..f92cdc3e4 --- /dev/null +++ b/weed/shell/command_volume_tier_upload.go @@ -0,0 +1,151 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeTierUpload{}) +} + +type commandVolumeTierUpload struct { +} + +func (c *commandVolumeTierUpload) Name() string { + return "volume.tier.upload" +} + +func (c *commandVolumeTierUpload) Help() string { + return `upload the dat file of a volume to a remote tier + + volume.tier.upload [-collection=""] [-fullPercent=95] [-quietFor=1h] + volume.tier.upload [-collection=""] -volumeId=<volume_id> -dest=<storage_backend> [-keepLocalDatFile] + + e.g.: + volume.tier.upload -volumeId=7 -dest=s3 + volume.tier.upload -volumeId=7 -dest=s3.default + + The <storage_backend> is defined in master.toml. + For example, "s3.default" in [storage.backend.s3.default] + + This command will move the dat file of a volume to a remote tier. + + SeaweedFS enables scalable and fast local access to lots of files, + and the cloud storage is slower by cost efficient. How to combine them together? + + Usually the data follows 80/20 rule: only 20% of data is frequently accessed. + We can offload the old volumes to the cloud. + + With this, SeaweedFS can be both fast and scalable, and infinite storage space. + Just add more local SeaweedFS volume servers to increase the throughput. + + The index file is still local, and the same O(1) disk read is applied to the remote file. + +` +} + +func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := tierCommand.Int("volumeId", 0, "the volume id") + collection := tierCommand.String("collection", "", "the collection name") + fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") + quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") + dest := tierCommand.String("dest", "", "the target tier name") + keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file") + if err = tierCommand.Parse(args); err != nil { + return nil + } + + vid := needle.VolumeId(*volumeId) + + // volumeId is provided + if vid != 0 { + return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) + } + + // apply to all volumes in the collection + // reusing collectVolumeIdsForEcEncode for now + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) + if err != nil { + return err + } + fmt.Printf("tier upload volumes: %v\n", volumeIds) + for _, vid := range volumeIds { + if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { + return err + } + } + + return nil +} + +func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) { + // find volume location + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { + return fmt.Errorf("volume %d not found", vid) + } + + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + if err != nil { + return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) + } + + // copy the .dat file to remote tier + err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile) + if err != nil { + return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err) + } + + return nil +} + +func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { + + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + DestinationBackendName: dest, + KeepLocalDatFile: keepLocalDatFile, + }) + + var lastProcessed int64 + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0 + + fmt.Fprintf(writer, "copied %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed) + + lastProcessed = resp.Processed + } + + return copyErr + }) + + return err + +} diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go new file mode 100644 index 000000000..7596bb4c8 --- /dev/null +++ b/weed/shell/command_volume_unmount.go @@ -0,0 +1,63 @@ +package shell + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" +) + +func init() { + Commands = append(Commands, &commandVolumeUnmount{}) +} + +type commandVolumeUnmount struct { +} + +func (c *commandVolumeUnmount) Name() string { + return "volume.unmount" +} + +func (c *commandVolumeUnmount) Help() string { + return `unmount a volume from one volume server + + volume.unmount <volume server host:port> <volume id> + + This command unmounts a volume from one volume server. + +` +} + +func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + if len(args) != 2 { + fmt.Fprintf(writer, "received args: %+v\n", args) + return fmt.Errorf("need 2 args of <volume server host:port> <volume id>") + } + sourceVolumeServer, volumeIdString := args[0], args[1] + + volumeId, err := needle.NewVolumeId(volumeIdString) + if err != nil { + return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) + } + + return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) + +} + +func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ + VolumeId: uint32(volumeId), + }) + return unmountErr + }) +} diff --git a/weed/shell/commands.go b/weed/shell/commands.go new file mode 100644 index 000000000..f61ed9f82 --- /dev/null +++ b/weed/shell/commands.go @@ -0,0 +1,137 @@ +package shell + +import ( + "fmt" + "io" + "net/url" + "strconv" + "strings" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks" +) + +type ShellOptions struct { + Masters *string + GrpcDialOption grpc.DialOption + // shell transient context + FilerHost string + FilerPort int64 + Directory string +} + +type CommandEnv struct { + env map[string]string + MasterClient *wdclient.MasterClient + option ShellOptions + locker *exclusive_locks.ExclusiveLocker +} + +type command interface { + Name() string + Help() string + Do([]string, *CommandEnv, io.Writer) error +} + +var ( + Commands = []command{} +) + +func NewCommandEnv(options ShellOptions) *CommandEnv { + ce := &CommandEnv{ + env: make(map[string]string), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")), + option: options, + } + ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient) + return ce +} + +func (ce *CommandEnv) parseUrl(input string) (path string, err error) { + if strings.HasPrefix(input, "http") { + err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more") + return + } + if !strings.HasPrefix(input, "/") { + input = util.Join(ce.option.Directory, input) + } + return input, err +} + +func (ce *CommandEnv) isDirectory(path string) bool { + + return ce.checkDirectory(path) == nil + +} + +func (ce *CommandEnv) confirmIsLocked() error { + + if ce.locker.IsLocking() { + return nil + } + + return fmt.Errorf("need to lock to continue") + +} + +func (ce *CommandEnv) checkDirectory(path string) error { + + dir, name := util.FullPath(path).DirAndName() + + exists, err := filer_pb.Exists(ce, dir, name, true) + + if !exists { + return fmt.Errorf("%s is not a directory", path) + } + + return err + +} + +var _ = filer_pb.FilerClient(&CommandEnv{}) + +func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress := fmt.Sprintf("%s:%d", ce.option.FilerHost, ce.option.FilerPort+10000) + return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn) + +} + +func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} + +func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { + if strings.HasPrefix(entryPath, "http") { + var u *url.URL + u, err = url.Parse(entryPath) + if err != nil { + return + } + filerServer = u.Hostname() + portString := u.Port() + if portString != "" { + filerPort, err = strconv.ParseInt(portString, 10, 32) + } + path = u.Path + } else { + err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath) + } + return +} + +func findInputDirectory(args []string) (input string) { + input = "." + if len(args) > 0 { + input = args[len(args)-1] + if strings.HasPrefix(input, "-") { + input = "." + } + } + return input +} diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go new file mode 100644 index 000000000..4632a1fb0 --- /dev/null +++ b/weed/shell/shell_liner.go @@ -0,0 +1,154 @@ +package shell + +import ( + "fmt" + "io" + "os" + "path" + "regexp" + "sort" + "strings" + + "github.com/peterh/liner" +) + +var ( + line *liner.State + historyPath = path.Join(os.TempDir(), "weed-shell") +) + +func RunShell(options ShellOptions) { + + line = liner.NewLiner() + defer line.Close() + + line.SetCtrlCAborts(true) + + setCompletionHandler() + loadHistory() + + defer saveHistory() + + reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) + + commandEnv := NewCommandEnv(options) + + go commandEnv.MasterClient.KeepConnectedToMaster() + commandEnv.MasterClient.WaitUntilConnected() + + for { + cmd, err := line.Prompt("> ") + if err != nil { + if err != io.EOF { + fmt.Printf("%v\n", err) + } + return + } + + for _, c := range strings.Split(cmd, ";") { + if processEachCmd(reg, c, commandEnv) { + return + } + } + } +} + +func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool { + cmds := reg.FindAllString(cmd, -1) + if len(cmds) == 0 { + return false + } else { + line.AppendHistory(cmd) + + args := make([]string, len(cmds[1:])) + + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + + cmd := strings.ToLower(cmds[0]) + if cmd == "help" || cmd == "?" { + printHelp(cmds) + } else if cmd == "exit" || cmd == "quit" { + return true + } else { + foundCommand := false + for _, c := range Commands { + if c.Name() == cmd || c.Name() == "fs."+cmd { + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + } + foundCommand = true + } + } + if !foundCommand { + fmt.Fprintf(os.Stderr, "unknown command: %v\n", cmd) + } + } + + } + return false +} + +func printGenericHelp() { + msg := + `Type: "help <command>" for help on <command> +` + fmt.Print(msg) + + sort.Slice(Commands, func(i, j int) bool { + return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0 + }) + for _, c := range Commands { + helpTexts := strings.SplitN(c.Help(), "\n", 2) + fmt.Printf(" %-30s\t# %s \n", c.Name(), helpTexts[0]) + } +} + +func printHelp(cmds []string) { + args := cmds[1:] + if len(args) == 0 { + printGenericHelp() + } else if len(args) > 1 { + fmt.Println() + } else { + cmd := strings.ToLower(args[0]) + + sort.Slice(Commands, func(i, j int) bool { + return strings.Compare(Commands[i].Name(), Commands[j].Name()) < 0 + }) + + for _, c := range Commands { + if c.Name() == cmd { + fmt.Printf(" %s\t# %s\n", c.Name(), c.Help()) + } + } + } +} + +func setCompletionHandler() { + line.SetCompleter(func(line string) (c []string) { + for _, i := range Commands { + if strings.HasPrefix(i.Name(), strings.ToLower(line)) { + c = append(c, i.Name()) + } + } + return + }) +} + +func loadHistory() { + if f, err := os.Open(historyPath); err == nil { + line.ReadHistory(f) + f.Close() + } +} + +func saveHistory() { + if f, err := os.Create(historyPath); err != nil { + fmt.Printf("Error writing history file: %v\n", err) + } else { + line.WriteHistory(f) + f.Close() + } +} |
