aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_bucket_create.go88
-rw-r--r--weed/shell/command_bucket_delete.go71
-rw-r--r--weed/shell/command_bucket_list.go81
-rw-r--r--weed/shell/command_collection_delete.go5
-rw-r--r--weed/shell/command_collection_list.go5
-rw-r--r--weed/shell/command_ec_balance.go53
-rw-r--r--weed/shell/command_ec_common.go43
-rw-r--r--weed/shell/command_ec_decode.go264
-rw-r--r--weed/shell/command_ec_encode.go92
-rw-r--r--weed/shell/command_ec_rebuild.go27
-rw-r--r--weed/shell/command_ec_test.go16
-rw-r--r--weed/shell/command_fs_cat.go13
-rw-r--r--weed/shell/command_fs_cd.go5
-rw-r--r--weed/shell/command_fs_du.go110
-rw-r--r--weed/shell/command_fs_ls.go104
-rw-r--r--weed/shell/command_fs_meta_cat.go72
-rw-r--r--weed/shell/command_fs_meta_load.go10
-rw-r--r--weed/shell/command_fs_meta_notify.go50
-rw-r--r--weed/shell/command_fs_meta_save.go181
-rw-r--r--weed/shell/command_fs_mv.go8
-rw-r--r--weed/shell/command_fs_tree.go87
-rw-r--r--weed/shell/command_volume_balance.go49
-rw-r--r--weed/shell/command_volume_configure_replication.go104
-rw-r--r--weed/shell/command_volume_copy.go4
-rw-r--r--weed/shell/command_volume_delete.go4
-rw-r--r--weed/shell/command_volume_fix_replication.go22
-rw-r--r--weed/shell/command_volume_list.go13
-rw-r--r--weed/shell/command_volume_mount.go7
-rw-r--r--weed/shell/command_volume_move.go25
-rw-r--r--weed/shell/command_volume_tier_download.go166
-rw-r--r--weed/shell/command_volume_tier_upload.go147
-rw-r--r--weed/shell/command_volume_unmount.go7
-rw-r--r--weed/shell/commands.go42
33 files changed, 1434 insertions, 541 deletions
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go
new file mode 100644
index 000000000..3546528aa
--- /dev/null
+++ b/weed/shell/command_bucket_create.go
@@ -0,0 +1,88 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketCreate{})
+}
+
+type commandBucketCreate struct {
+}
+
+func (c *commandBucketCreate) Name() string {
+ return "bucket.create"
+}
+
+func (c *commandBucketCreate) Help() string {
+ return `create a bucket with a given name
+
+ Example:
+ bucket.create -name <bucket_name> -replication 001
+`
+}
+
+func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ bucketName := bucketCommand.String("name", "", "bucket name")
+ replication := bucketCommand.String("replication", "", "replication setting for the bucket")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *bucketName == "" {
+ return fmt.Errorf("empty bucket name")
+ }
+
+ filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err)
+ }
+ filerBucketsPath := resp.DirBuckets
+
+ println("create bucket under", filerBucketsPath)
+
+ entry := &filer_pb.Entry{
+ Name: *bucketName,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0777 | os.ModeDir),
+ Collection: *bucketName,
+ Replication: *replication,
+ },
+ }
+
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: filerBucketsPath,
+ Entry: entry,
+ }); err != nil {
+ return err
+ }
+
+ println("created bucket", *bucketName)
+
+ return nil
+
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go
new file mode 100644
index 000000000..c57ce7221
--- /dev/null
+++ b/weed/shell/command_bucket_delete.go
@@ -0,0 +1,71 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketDelete{})
+}
+
+type commandBucketDelete struct {
+}
+
+func (c *commandBucketDelete) Name() string {
+ return "bucket.delete"
+}
+
+func (c *commandBucketDelete) Help() string {
+ return `delete a bucket by a given name
+
+ bucket.delete -name <bucket_name>
+`
+}
+
+func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ bucketName := bucketCommand.String("name", "", "bucket name")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *bucketName == "" {
+ return fmt.Errorf("empty bucket name")
+ }
+
+ filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err)
+ }
+ filerBucketsPath := resp.DirBuckets
+
+ if _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
+ Directory: filerBucketsPath,
+ Name: *bucketName,
+ IsDeleteData: false,
+ IsRecursive: true,
+ IgnoreRecursiveError: true,
+ }); err != nil {
+ return err
+ }
+
+ return nil
+
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go
new file mode 100644
index 000000000..5eb5972ce
--- /dev/null
+++ b/weed/shell/command_bucket_list.go
@@ -0,0 +1,81 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketList{})
+}
+
+type commandBucketList struct {
+}
+
+func (c *commandBucketList) Name() string {
+ return "bucket.list"
+}
+
+func (c *commandBucketList) Help() string {
+ return `list all buckets
+
+`
+}
+
+func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err)
+ }
+ filerBucketsPath := resp.DirBuckets
+
+ stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
+ Directory: filerBucketsPath,
+ Limit: math.MaxUint32,
+ })
+ if err != nil {
+ return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
+ }
+
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ if resp.Entry.Attributes.Replication == "" || resp.Entry.Attributes.Replication == "000" {
+ fmt.Fprintf(writer, " %s\n", resp.Entry.Name)
+ } else {
+ fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", resp.Entry.Name, resp.Entry.Attributes.Replication)
+ }
+ }
+
+ return nil
+
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index fbaddcd51..4b3d7f0be 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -34,9 +34,8 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ
collectionName := args[0]
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
return err
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index c4325c66f..2a114e61b 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -41,9 +41,8 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.CollectionList(ctx, &master_pb.CollectionListRequest{
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
})
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 96599372e..299d44fed 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"flag"
"fmt"
"io"
@@ -107,10 +106,8 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return nil
}
- ctx := context.Background()
-
// collect all ec nodes
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, *dc)
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
if err != nil {
return err
}
@@ -138,7 +135,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
}
}
- if err := balanceEcRacks(ctx, commandEnv, racks, *applyBalancing); err != nil {
+ if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
return fmt.Errorf("balance ec racks: %v", err)
}
@@ -162,38 +159,36 @@ func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
- ctx := context.Background()
-
fmt.Printf("balanceEcVolumes %s\n", collection)
- if err := deleteDuplicatedEcShards(ctx, commandEnv, allEcNodes, collection, applyBalancing); err != nil {
+ if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
}
- if err := balanceEcShardsAcrossRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
- if err := balanceEcShardsWithinRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
return nil
}
-func deleteDuplicatedEcShards(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
+func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
// vid => []ecNode
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
// deduplicate ec shards
for vid, locations := range vidLocations {
- if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil {
+ if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
+func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
@@ -215,10 +210,10 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
duplicatedShardIds := []uint32{uint32(shardId)}
for _, ecNode := range ecNodes[1:] {
- if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
- if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
@@ -227,19 +222,19 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
return nil
}
-func balanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
// spread the ec shards evenly
for vid, locations := range vidLocations {
- if err := doBalanceEcShardsAcrossRacks(ctx, commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
+ if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
+func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
@@ -274,7 +269,7 @@ func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, c
for _, n := range racks[rackId].ecNodes {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -306,7 +301,7 @@ func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]i
return ""
}
-func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
@@ -330,7 +325,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
- if err := doBalanceEcShardsWithinOneRack(ctx, commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
+ if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
return err
}
}
@@ -338,7 +333,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all
return nil
}
-func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
for _, ecNode := range existingLocations {
@@ -353,7 +348,7 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv,
fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -365,18 +360,18 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv,
return nil
}
-func balanceEcRacks(ctx context.Context, commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
+func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
// balance one rack for all ec shards
for _, ecRack := range racks {
- if err := doBalanceEcRack(ctx, commandEnv, ecRack, applyBalancing); err != nil {
+ if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
+func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
if len(ecRack.ecNodes) <= 1 {
return nil
@@ -421,7 +416,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
- err := moveMountedShardToEcNode(ctx, commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
+ err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
if err != nil {
return err
}
@@ -440,7 +435,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack
return nil
}
-func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes)
@@ -458,7 +453,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, a
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
- err := moveMountedShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
+ err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
if err != nil {
return err
}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index cfe14fed5..0db119d3c 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -15,26 +15,26 @@ import (
"google.golang.org/grpc"
)
-func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
copiedShardIds := []uint32{uint32(shardId)}
if applyBalancing {
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
@@ -50,14 +50,10 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist
}
-func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
- targetServer *EcNode, startFromShardId uint32, shardCount int,
+func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
+ targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
- var shardIdsToCopy []uint32
- for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
- shardIdsToCopy = append(shardIdsToCopy, shardId)
- }
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -65,11 +61,13 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
if targetServer.info.Id != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
CopyEcxFile: true,
+ CopyEcjFile: true,
+ CopyVifFile: true,
SourceDataNode: existingLocation,
})
if copyErr != nil {
@@ -78,7 +76,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
}
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -180,12 +178,12 @@ type EcRack struct {
freeEcSlot int
}
-func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -213,13 +211,12 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCen
return
}
-func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
+ _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeDeletedShardIds,
@@ -229,13 +226,12 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt
}
-func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
+ _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
})
@@ -243,13 +239,12 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
})
}
-func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeMountedhardIds,
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
new file mode 100644
index 000000000..b69e403cb
--- /dev/null
+++ b/weed/shell/command_ec_decode.go
@@ -0,0 +1,264 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandEcDecode{})
+}
+
+type commandEcDecode struct {
+}
+
+func (c *commandEcDecode) Name() string {
+ return "ec.decode"
+}
+
+func (c *commandEcDecode) Help() string {
+ return `decode a erasure coded volume into a normal volume
+
+ ec.decode [-collection=""] [-volumeId=<volume_id>]
+
+`
+}
+
+func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
+ collection := encodeCommand.String("collection", "", "the collection name")
+ if err = encodeCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // collect topology information
+ topologyInfo, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ // volumeId is provided
+ if vid != 0 {
+ return doEcDecode(commandEnv, topologyInfo, *collection, vid)
+ }
+
+ // apply to all volumes in the collection
+ volumeIds := collectEcShardIds(topologyInfo, *collection)
+ fmt.Printf("ec encode volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
+ // find volume location
+ nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
+
+ fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
+
+ // collect ec shards to the server with most space
+ targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
+ if err != nil {
+ return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
+ }
+
+ // generate a normal volume
+ err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
+ if err != nil {
+ return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
+ }
+
+ // delete the previous ec shards
+ err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
+ if err != nil {
+ return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
+ }
+
+ return nil
+}
+
+func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
+
+ // mount volume
+ if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
+ VolumeId: uint32(vid),
+ })
+ return mountErr
+ }); err != nil {
+ return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
+ }
+
+ // unmount ec shards
+ for location, ecIndexBits := range nodeToEcIndexBits {
+ fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
+ if err != nil {
+ return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
+ }
+ }
+ // delete ec shards
+ for location, ecIndexBits := range nodeToEcIndexBits {
+ fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
+ if err != nil {
+ return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
+ }
+ }
+
+ return nil
+}
+
+func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
+
+ fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
+
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
+ VolumeId: uint32(vid),
+ Collection: collection,
+ })
+ return genErr
+ })
+
+ return err
+
+}
+
+func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
+
+ maxShardCount := 0
+ var exisitngEcIndexBits erasure_coding.ShardBits
+ for loc, ecIndexBits := range nodeToEcIndexBits {
+ toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
+ if toBeCopiedShardCount > maxShardCount {
+ maxShardCount = toBeCopiedShardCount
+ targetNodeLocation = loc
+ exisitngEcIndexBits = ecIndexBits
+ }
+ }
+
+ fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
+
+ var copiedEcIndexBits erasure_coding.ShardBits
+ for loc, ecIndexBits := range nodeToEcIndexBits {
+ if loc == targetNodeLocation {
+ continue
+ }
+
+ needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
+ if needToCopyEcIndexBits.ShardIdCount() == 0 {
+ continue
+ }
+
+ err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+
+ fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
+
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(vid),
+ Collection: collection,
+ ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
+ CopyEcxFile: false,
+ CopyEcjFile: true,
+ CopyVifFile: true,
+ SourceDataNode: loc,
+ })
+ if copyErr != nil {
+ return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ break
+ }
+
+ copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
+
+ }
+
+ nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
+
+ return targetNodeLocation, err
+
+}
+
+func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ return resp.TopologyInfo, nil
+
+}
+
+func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
+
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.EcShardInfos {
+ if v.Collection == selectedCollection && v.Id == uint32(vid) {
+ ecShardInfos = append(ecShardInfos, v)
+ }
+ }
+ })
+
+ return
+}
+
+func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
+
+ vidMap := make(map[uint32]bool)
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.EcShardInfos {
+ if v.Collection == selectedCollection {
+ vidMap[v.Id] = true
+ }
+ }
+ })
+
+ for vid := range vidMap {
+ vids = append(vids, needle.VolumeId(vid))
+ }
+
+ return
+}
+
+func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
+
+ nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.EcShardInfos {
+ if v.Id == uint32(vid) {
+ nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
+ }
+ }
+ })
+
+ return nodeToEcIndexBits
+}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index f07cb93f9..6efb05488 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -8,13 +8,14 @@ import (
"sync"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "google.golang.org/grpc"
)
func init() {
@@ -62,22 +63,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// volumeId is provided
if vid != 0 {
- return doEcEncode(ctx, commandEnv, *collection, vid)
+ return doEcEncode(commandEnv, *collection, vid)
}
// apply to all volumes in the collection
- volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
+ if err = doEcEncode(commandEnv, *collection, vid); err != nil {
return err
}
}
@@ -85,27 +85,29 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
return fmt.Errorf("volume %d not found", vid)
}
+ // fmt.Printf("found ec %d shards on %v\n", vid, locations)
+
// mark the volume as readonly
- err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
if err != nil {
- return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// generate ec shards
- err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
+ err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
// balance the ec shards to current cluster
- err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
+ err = spreadEcShards(commandEnv, vid, collection, locations)
if err != nil {
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
}
@@ -113,12 +115,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
return nil
}
-func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
+func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
for _, location := range locations {
err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
+ _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
})
return markErr
@@ -133,10 +135,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol
return nil
}
-func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
+ _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -147,9 +149,9 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum
}
-func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
+func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "")
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -163,29 +165,29 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
}
// calculate how many shards to allocate for these servers
- allocated := balancedEcDistribution(allocatedDataNodes)
+ allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
// ask the data nodes to copy from the source volume server
- copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocated, volumeId, collection, existingLocations[0])
+ copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return err
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
- err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
+ err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
@@ -195,32 +197,28 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
}
-func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
- targetServers []*EcNode, allocated []int,
- volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
+func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
// parallelize
shardIdChan := make(chan []uint32, len(targetServers))
var wg sync.WaitGroup
- startFromShardId := uint32(0)
for i, server := range targetServers {
- if allocated[i] <= 0 {
+ if len(allocatedEcIds[i]) <= 0 {
continue
}
wg.Add(1)
- go func(server *EcNode, startFromShardId uint32, shardCount int) {
+ go func(server *EcNode, allocatedEcShardIds []uint32) {
defer wg.Done()
- copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
- startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
+ copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
+ allocatedEcShardIds, volumeId, collection, existingLocation.Url)
if copyErr != nil {
err = copyErr
} else {
shardIdChan <- copiedShardIds
server.addEcVolumeShards(volumeId, collection, copiedShardIds)
}
- }(server, startFromShardId, allocated[i])
- startFromShardId += uint32(allocated[i])
+ }(server, allocatedEcIds[i])
}
wg.Wait()
close(shardIdChan)
@@ -236,29 +234,29 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
return
}
-func balancedEcDistribution(servers []*EcNode) (allocated []int) {
- allocated = make([]int, len(servers))
- allocatedCount := 0
- for allocatedCount < erasure_coding.TotalShardsCount {
- for i, server := range servers {
- if server.freeEcSlot-allocated[i] > 0 {
- allocated[i] += 1
- allocatedCount += 1
- }
- if allocatedCount >= erasure_coding.TotalShardsCount {
- break
- }
+func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
+ allocated = make([][]uint32, len(servers))
+ allocatedShardIdIndex := uint32(0)
+ serverIndex := 0
+ for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
+ if servers[serverIndex].freeEcSlot > 0 {
+ allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
+ allocatedShardIdIndex++
+ }
+ serverIndex++
+ if serverIndex >= len(servers) {
+ serverIndex = 0
}
}
return allocated
}
-func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -281,7 +279,7 @@ func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, se
}
})
- for vid, _ := range vidMap {
+ for vid := range vidMap {
vids = append(vids, needle.VolumeId(vid))
}
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index b43aed599..d9d943e6d 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -64,7 +64,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
}
// collect all ec nodes
- allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv, "")
+ allEcNodes, _, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -92,8 +92,6 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error {
- ctx := context.Background()
-
fmt.Printf("rebuildEcVolumes %s\n", collection)
// collect vid => each shard locations, similar to ecShardMap in topology.go
@@ -117,7 +115,7 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
return fmt.Errorf("disk space is not enough")
}
- if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
+ if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
return err
}
}
@@ -125,13 +123,13 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
return nil
}
-func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
+func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
var generatedShardIds []uint32
- copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
+ copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
if err != nil {
return err
}
@@ -139,7 +137,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
// clean up working files
// ask the rebuilder to delete the copied shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
if err != nil {
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
}
@@ -151,13 +149,13 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
}
// generate ec shards, and maybe ecx file
- generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
+ generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
if err != nil {
return err
}
// mount the generated shards
- err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
+ err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
if err != nil {
return err
}
@@ -167,11 +165,10 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
return nil
}
-func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
+func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{
+ resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -183,7 +180,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
return
}
-func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
+func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
needEcxFile := true
var localShardBits erasure_coding.ShardBits
@@ -210,11 +207,13 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder
var copyErr error
if applyBalancing {
copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: []uint32{uint32(shardId)},
CopyEcxFile: needEcxFile,
+ CopyEcjFile: needEcxFile,
+ CopyVifFile: needEcxFile,
SourceDataNode: ecNodes[0].info.Id,
})
return copyErr
diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go
index 9e578ed28..4fddcbea5 100644
--- a/weed/shell/command_ec_test.go
+++ b/weed/shell/command_ec_test.go
@@ -1,13 +1,25 @@
package shell
import (
- "context"
+ "fmt"
"testing"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
+func TestCommandEcDistribution(t *testing.T) {
+
+ allEcNodes := []*EcNode{
+ newEcNode("dc1", "rack1", "dn1", 100),
+ newEcNode("dc1", "rack2", "dn2", 100),
+ }
+
+ allocated := balancedEcDistribution(allEcNodes)
+
+ fmt.Printf("allocated: %+v", allocated)
+}
+
func TestCommandEcBalanceSmall(t *testing.T) {
allEcNodes := []*EcNode{
@@ -108,7 +120,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
racks := collectRacks(allEcNodes)
balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
- balanceEcRacks(context.Background(), nil, racks, false)
+ balanceEcRacks(nil, racks, false)
}
func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index 66ced46c5..3db487979 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
"math"
@@ -24,12 +23,8 @@ func (c *commandFsCat) Name() string {
func (c *commandFsCat) Help() string {
return `stream the file content on to the screen
- fs.cat /dir/
fs.cat /dir/file_name
- fs.cat /dir/file_prefix
- fs.cat http://<filer_server>:<port>/dir/
fs.cat http://<filer_server>:<port>/dir/file_name
- fs.cat http://<filer_server>:<port>/dir/file_prefix
`
}
@@ -42,21 +37,19 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(filerServer, filerPort, path) {
return fmt.Errorf("%s is a directory", path)
}
dir, name := filer2.FullPath(path).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
- respLookupEntry, err := client.LookupDirectoryEntry(ctx, request)
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
if err != nil {
return err
}
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
index 408ec86c8..df42cd516 100644
--- a/weed/shell/command_fs_cd.go
+++ b/weed/shell/command_fs_cd.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"io"
)
@@ -45,9 +44,7 @@ func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return nil
}
- ctx := context.Background()
-
- err = commandEnv.checkDirectory(ctx, filerServer, filerPort, path)
+ err = commandEnv.checkDirectory(filerServer, filerPort, path)
if err == nil {
commandEnv.option.FilerHost = filerServer
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index 5e634c82a..ca2f22b57 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -1,13 +1,12 @@
package shell
import (
- "context"
"fmt"
+ "io"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "io"
)
func init() {
@@ -37,81 +36,70 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(filerServer, filerPort, path) {
path = path + "/"
}
+ var blockCount, byteCount uint64
dir, name := filer2.FullPath(path).DirAndName()
+ blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name)
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
-
- _, _, err = paginateDirectory(ctx, writer, client, dir, name, 1000)
-
- return err
+ if name == "" && err == nil {
+ fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
+ }
- })
+ return
}
-func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int) (blockCount uint64, byteCount uint64, err error) {
-
- paginatedCount := -1
- startFromFileName := ""
-
- for paginatedCount == -1 || paginatedCount == paginateSize {
- resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: dir,
- Prefix: name,
- StartFromFileName: startFromFileName,
- InclusiveStartFrom: false,
- Limit: uint32(paginateSize),
- })
- if listErr != nil {
- err = listErr
- return
- }
+func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) {
- paginatedCount = len(resp.Entries)
-
- for _, entry := range resp.Entries {
- if entry.IsDirectory {
- subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
- if dir == "/" {
- subDir = "/" + entry.Name
- }
- numBlock, numByte, err := paginateDirectory(ctx, writer, client, subDir, "", paginateSize)
- if err == nil {
- blockCount += numBlock
- byteCount += numByte
- }
- } else {
- blockCount += uint64(len(entry.Chunks))
- byteCount += filer2.TotalSize(entry.Chunks)
+ err = filer2.ReadDirAllEntries(filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
+ if entry.IsDirectory {
+ subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
+ if dir == "/" {
+ subDir = "/" + entry.Name
}
- startFromFileName = entry.Name
-
- if name != "" && !entry.IsDirectory {
- fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name)
+ numBlock, numByte, err := duTraverseDirectory(writer, filerClient, subDir, "")
+ if err == nil {
+ blockCount += numBlock
+ byteCount += numByte
}
+ } else {
+ blockCount += uint64(len(entry.Chunks))
+ byteCount += filer2.TotalSize(entry.Chunks)
}
- }
-
- if name == "" {
- fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
- }
+ if name != "" && !entry.IsDirectory {
+ fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name)
+ }
+ })
return
-
}
-func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, filerGrpcAddress, env.option.GrpcDialOption)
+ return pb.WithGrpcFilerClient(filerGrpcAddress, env.option.GrpcDialOption, fn)
}
+
+type commandFilerClient struct {
+ env *CommandEnv
+ filerServer string
+ filerPort int64
+}
+
+func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *commandFilerClient {
+ return &commandFilerClient{
+ env: env,
+ filerServer: filerServer,
+ filerPort: filerPort,
+ }
+}
+func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+ return c.env.withFilerClient(c.filerServer, c.filerPort, fn)
+}
+func (c *commandFilerClient) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
+}
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 6979635e1..69ebe1b30 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -1,15 +1,15 @@
package shell
import (
- "context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"os"
"os/user"
"strconv"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func init() {
@@ -59,90 +59,56 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(filerServer, filerPort, path) {
path = path + "/"
}
dir, name := filer2.FullPath(path).DirAndName()
+ entryCount := 0
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
-
- return paginateOneDirectory(ctx, writer, client, dir, name, 1000, isLongFormat, showHidden)
-
- })
-
-}
-
-func paginateOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int, isLongFormat, showHidden bool) (err error) {
+ err = filer2.ReadDirAllEntries(commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
- entryCount := 0
- paginatedCount := -1
- startFromFileName := ""
-
- for paginatedCount == -1 || paginatedCount == paginateSize {
- resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: dir,
- Prefix: name,
- StartFromFileName: startFromFileName,
- InclusiveStartFrom: false,
- Limit: uint32(paginateSize),
- })
- if listErr != nil {
- err = listErr
+ if !showHidden && strings.HasPrefix(entry.Name, ".") {
return
}
- paginatedCount = len(resp.Entries)
-
- for _, entry := range resp.Entries {
+ entryCount++
- if !showHidden && strings.HasPrefix(entry.Name, ".") {
- continue
- }
-
- entryCount++
-
- if isLongFormat {
- fileMode := os.FileMode(entry.Attributes.FileMode)
- userName, groupNames := entry.Attributes.UserName, entry.Attributes.GroupName
- if userName == "" {
- if user, userErr := user.LookupId(strconv.Itoa(int(entry.Attributes.Uid))); userErr == nil {
- userName = user.Username
- }
- }
- groupName := ""
- if len(groupNames) > 0 {
- groupName = groupNames[0]
+ if isLongFormat {
+ fileMode := os.FileMode(entry.Attributes.FileMode)
+ userName, groupNames := entry.Attributes.UserName, entry.Attributes.GroupName
+ if userName == "" {
+ if user, userErr := user.LookupId(strconv.Itoa(int(entry.Attributes.Uid))); userErr == nil {
+ userName = user.Username
}
- if groupName == "" {
- if group, groupErr := user.LookupGroupId(strconv.Itoa(int(entry.Attributes.Gid))); groupErr == nil {
- groupName = group.Name
- }
- }
-
- if dir == "/" {
- // just for printing
- dir = ""
+ }
+ groupName := ""
+ if len(groupNames) > 0 {
+ groupName = groupNames[0]
+ }
+ if groupName == "" {
+ if group, groupErr := user.LookupGroupId(strconv.Itoa(int(entry.Attributes.Gid))); groupErr == nil {
+ groupName = group.Name
}
- fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n",
- fileMode, len(entry.Chunks),
- userName, groupName,
- filer2.TotalSize(entry.Chunks), dir, entry.Name)
- } else {
- fmt.Fprintf(writer, "%s\n", entry.Name)
}
- startFromFileName = entry.Name
-
+ if dir == "/" {
+ // just for printing
+ dir = ""
+ }
+ fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n",
+ fileMode, len(entry.Chunks),
+ userName, groupName,
+ filer2.TotalSize(entry.Chunks), dir, entry.Name)
+ } else {
+ fmt.Fprintf(writer, "%s\n", entry.Name)
}
- }
- if isLongFormat {
+ })
+
+ if isLongFormat && err == nil {
fmt.Fprintf(writer, "total %d\n", entryCount)
}
return
-
}
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
new file mode 100644
index 000000000..cd1ffb6fd
--- /dev/null
+++ b/weed/shell/command_fs_meta_cat.go
@@ -0,0 +1,72 @@
+package shell
+
+import (
+ "fmt"
+ "io"
+
+ "github.com/golang/protobuf/jsonpb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsMetaCat{})
+}
+
+type commandFsMetaCat struct {
+}
+
+func (c *commandFsMetaCat) Name() string {
+ return "fs.meta.cat"
+}
+
+func (c *commandFsMetaCat) Help() string {
+ return `print out the meta data content for a file or directory
+
+ fs.meta.cat /dir/
+ fs.meta.cat /dir/file_name
+ fs.meta.cat http://<filer_server>:<port>/dir/
+ fs.meta.cat http://<filer_server>:<port>/dir/file_name
+`
+}
+
+func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ input := findInputDirectory(args)
+
+ filerServer, filerPort, path, err := commandEnv.parseUrl(input)
+ if err != nil {
+ return err
+ }
+
+ dir, name := filer2.FullPath(path).DirAndName()
+
+ return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ return err
+ }
+
+ m := jsonpb.Marshaler{
+ EmitDefaults: true,
+ Indent: " ",
+ }
+
+ text, marshalErr := m.MarshalToString(respLookupEntry.Entry)
+ if marshalErr != nil {
+ return fmt.Errorf("marshal meta: %v", marshalErr)
+ }
+
+ fmt.Fprintf(writer, "%s\n", text)
+
+ return nil
+
+ })
+
+}
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 5ea8de9f5..ed92d8011 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -1,15 +1,15 @@
package shell
import (
- "context"
"fmt"
"io"
"os"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -53,9 +53,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- ctx := context.Background()
-
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
@@ -80,7 +78,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
- if _, err = client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: fullEntry.Dir,
Entry: fullEntry.Entry,
}); err != nil {
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
index 13b272fbf..099e04506 100644
--- a/weed/shell/command_fs_meta_notify.go
+++ b/weed/shell/command_fs_meta_notify.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -9,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/spf13/viper"
)
func init() {
@@ -41,38 +39,36 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
}
util.LoadConfiguration("notification", true)
- v := viper.GetViper()
- notification.LoadConfiguration(v.Sub("notification"))
+ v := util.GetViper()
+ notification.LoadConfiguration(v, "notification.")
- ctx := context.Background()
+ var dirCount, fileCount uint64
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) {
- var dirCount, fileCount uint64
-
- err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
-
- if entry.IsDirectory {
- dirCount++
- } else {
- fileCount++
- }
-
- return notification.Queue.SendMessage(
- string(parentPath.Child(entry.Name)),
- &filer_pb.EventNotification{
- NewEntry: entry,
- },
- )
+ if entry.IsDirectory {
+ dirCount++
+ } else {
+ fileCount++
+ }
- })
+ notifyErr := notification.Queue.SendMessage(
+ string(parentPath.Child(entry.Name)),
+ &filer_pb.EventNotification{
+ NewEntry: entry,
+ },
+ )
- if err == nil {
- fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount)
+ if notifyErr != nil {
+ fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr)
}
- return err
-
})
+ if err == nil {
+ fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount)
+ }
+
+ return err
+
}
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index e710fe297..b51fdd0f6 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -1,17 +1,19 @@
package shell
import (
- "context"
"flag"
"fmt"
"io"
"os"
+ "sync"
+ "sync/atomic"
"time"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -51,114 +53,127 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
- filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args()))
- if err != nil {
- return err
+ filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args()))
+ if parseErr != nil {
+ return parseErr
}
- ctx := context.Background()
+ t := time.Now()
+ fileName := *outputFileName
+ if fileName == "" {
+ fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
+ filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
+ }
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("failed to create file %s: %v", fileName, openErr)
+ }
+ defer dst.Close()
- t := time.Now()
- fileName := *outputFileName
- if fileName == "" {
- fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
- filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
+ var wg sync.WaitGroup
+ wg.Add(1)
+ outputChan := make(chan []byte, 1024)
+ go func() {
+ sizeBuf := make([]byte, 4)
+ for b := range outputChan {
+ util.Uint32toBytes(sizeBuf, uint32(len(b)))
+ dst.Write(sizeBuf)
+ dst.Write(b)
}
+ wg.Done()
+ }()
- dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- return nil
- }
- defer dst.Close()
+ var dirCount, fileCount uint64
- var dirCount, fileCount uint64
+ err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) {
- sizeBuf := make([]byte, 4)
+ protoMessage := &filer_pb.FullEntry{
+ Dir: string(parentPath),
+ Entry: entry,
+ }
- err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
+ bytes, err := proto.Marshal(protoMessage)
+ if err != nil {
+ fmt.Fprintf(writer, "marshall error: %v\n", err)
+ return
+ }
- protoMessage := &filer_pb.FullEntry{
- Dir: string(parentPath),
- Entry: entry,
- }
+ outputChan <- bytes
- bytes, err := proto.Marshal(protoMessage)
- if err != nil {
- return fmt.Errorf("marshall error: %v", err)
- }
+ if entry.IsDirectory {
+ atomic.AddUint64(&dirCount, 1)
+ } else {
+ atomic.AddUint64(&fileCount, 1)
+ }
- util.Uint32toBytes(sizeBuf, uint32(len(bytes)))
+ if *verbose {
+ println(parentPath.Child(entry.Name))
+ }
- dst.Write(sizeBuf)
- dst.Write(bytes)
+ })
- if entry.IsDirectory {
- dirCount++
- } else {
- fileCount++
- }
+ close(outputChan)
- if *verbose {
- println(parentPath.Child(entry.Name))
- }
+ wg.Wait()
- return nil
+ if err == nil {
+ fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)
+ fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName)
+ }
- })
+ return err
- if err == nil {
- fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount)
- fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName)
- }
+}
+func doTraverseBFS(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) {
- return err
+ K := 5
- })
+ var jobQueueWg sync.WaitGroup
+ queue := util.NewQueue()
+ jobQueueWg.Add(1)
+ queue.Enqueue(parentPath)
+ var isTerminating bool
+ for i := 0; i < K; i++ {
+ go func() {
+ for {
+ if isTerminating {
+ break
+ }
+ t := queue.Dequeue()
+ if t == nil {
+ time.Sleep(329 * time.Millisecond)
+ continue
+ }
+ dir := t.(filer2.FullPath)
+ processErr := processOneDirectory(writer, filerClient, dir, queue, &jobQueueWg, fn)
+ if processErr != nil {
+ err = processErr
+ }
+ jobQueueWg.Done()
+ }
+ }()
+ }
+ jobQueueWg.Wait()
+ isTerminating = true
+ return
}
-func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
-
- paginatedCount := -1
- startFromFileName := ""
- paginateSize := 1000
-
- for paginatedCount == -1 || paginatedCount == paginateSize {
- resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: string(parentPath),
- Prefix: "",
- StartFromFileName: startFromFileName,
- InclusiveStartFrom: false,
- Limit: uint32(paginateSize),
- })
- if listErr != nil {
- err = listErr
- return
- }
- paginatedCount = len(resp.Entries)
+func processOneDirectory(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) {
- for _, entry := range resp.Entries {
+ return filer2.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) {
- if err = fn(parentPath, entry); err != nil {
- return err
- }
+ fn(parentPath, entry)
- if entry.IsDirectory {
- subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
- if parentPath == "/" {
- subDir = "/" + entry.Name
- }
- if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil {
- return err
- }
+ if entry.IsDirectory {
+ subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
+ if parentPath == "/" {
+ subDir = "/" + entry.Name
}
- startFromFileName = entry.Name
-
+ jobQueueWg.Add(1)
+ queue.Enqueue(filer2.FullPath(subDir))
}
- }
-
- return
+ })
}
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index 67606ab53..85275058e 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -47,20 +47,18 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return err
}
- ctx := context.Background()
-
sourceDir, sourceName := filer2.FullPath(sourcePath).DirAndName()
destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
// collect destination entry info
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
Name: destinationDir,
Directory: destinationName,
}
- respDestinationLookupEntry, err := client.LookupDirectoryEntry(ctx, destinationRequest)
+ respDestinationLookupEntry, err := filer_pb.LookupEntry(client, destinationRequest)
var targetDir, targetName string
@@ -82,7 +80,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
NewName: targetName,
}
- _, err = client.AtomicRenameEntry(ctx, request)
+ _, err = client.AtomicRenameEntry(context.Background(), request)
fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, filer2.NewFullPath(targetDir, targetName))
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index 8474e43ea..04530571c 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -1,12 +1,12 @@
package shell
import (
- "context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func init() {
@@ -36,77 +36,42 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ
dir, name := filer2.FullPath(path).DirAndName()
- ctx := context.Background()
+ dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, newPrefix(), -1)
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
-
- dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, dir, name, newPrefix(), -1)
-
- if terr == nil {
- fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount)
- }
+ if terr == nil {
+ fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount)
+ }
- return terr
-
- })
+ return terr
}
-func treeTraverseDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) {
-
- paginatedCount := -1
- startFromFileName := ""
- paginateSize := 1000
-
- for paginatedCount == -1 || paginatedCount == paginateSize {
- resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: dir,
- Prefix: name,
- StartFromFileName: startFromFileName,
- InclusiveStartFrom: false,
- Limit: uint32(paginateSize),
- })
- if listErr != nil {
- err = listErr
- return
- }
- paginatedCount = len(resp.Entries)
- if paginatedCount > 0 {
- prefix.addMarker(level)
- }
+func treeTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir filer2.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) {
- for i, entry := range resp.Entries {
+ prefix.addMarker(level)
- if level < 0 && name != "" {
- if entry.Name != name {
- break
- }
+ err = filer2.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
+ if level < 0 && name != "" {
+ if entry.Name != name {
+ return
}
+ }
- // 0.1% wrong prefix here, but fixing it would need to paginate to the next batch first
- isLast := paginatedCount < paginateSize && i == paginatedCount-1
- fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name)
-
- if entry.IsDirectory {
- directoryCount++
- subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
- if dir == "/" {
- subDir = "/" + entry.Name
- }
- dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, client, subDir, "", prefix, level+1)
- directoryCount += dirCount
- fileCount += fCount
- err = terr
- } else {
- fileCount++
- }
- startFromFileName = entry.Name
+ fmt.Fprintf(writer, "%s%s\n", prefix.getPrefix(level, isLast), entry.Name)
+ if entry.IsDirectory {
+ directoryCount++
+ subDir := dir.Child(entry.Name)
+ dirCount, fCount, terr := treeTraverseDirectory(writer, filerClient, subDir, "", prefix, level+1)
+ directoryCount += dirCount
+ fileCount += fCount
+ err = terr
+ } else {
+ fileCount++
}
- }
+ })
return
-
}
type Prefix struct {
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index d7ef0d005..349f52f1c 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -27,7 +27,7 @@ func (c *commandVolumeBalance) Name() string {
func (c *commandVolumeBalance) Help() string {
return `balance all volumes among volume servers
- volume.balance [-c ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>]
+ volume.balance [-collection ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>]
Algorithm:
@@ -69,9 +69,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -79,8 +78,10 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
}
typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc)
- for _, volumeServers := range typeToNodes {
+
+ for maxVolumeCount, volumeServers := range typeToNodes {
if len(volumeServers) < 2 {
+ fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount)
continue
}
if *collection == "EACH_COLLECTION" {
@@ -93,8 +94,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
}
- } else if *collection == "ALL" {
- if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL", *applyBalancing); err != nil {
+ } else if *collection == "ALL_COLLECTIONS" {
+ if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
return err
}
} else {
@@ -107,18 +108,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
-func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
- var nodes []*Node
- for _, dn := range dataNodeInfos {
- nodes = append(nodes, &Node{
- info: dn,
- })
- }
+func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
// balance writable volumes
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
- if collection != "ALL" {
+ if collection != "ALL_COLLECTIONS" {
if v.Collection != collection {
return false
}
@@ -133,7 +128,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.Dat
// balance readable volumes
for _, n := range nodes {
n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
- if collection != "ALL" {
+ if collection != "ALL_COLLECTIONS" {
if v.Collection != collection {
return false
}
@@ -148,15 +143,19 @@ func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.Dat
return nil
}
-func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*master_pb.DataNodeInfo) {
- typeToNodes = make(map[uint64][]*master_pb.DataNodeInfo)
+func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
+ typeToNodes = make(map[uint64][]*Node)
for _, dc := range t.DataCenterInfos {
if selectedDataCenter != "" && dc.Id != selectedDataCenter {
continue
}
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
- typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], dn)
+ typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
+ info: dn,
+ dc: dc.Id,
+ rack: r.Id,
+ })
}
}
}
@@ -166,6 +165,8 @@ func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter st
type Node struct {
info *master_pb.DataNodeInfo
selectedVolumes map[uint32]*master_pb.VolumeInformationMessage
+ dc string
+ rack string
}
func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
@@ -207,6 +208,13 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates
sortCandidatesFn(candidateVolumes)
for _, v := range candidateVolumes {
+ if v.ReplicaPlacement > 0 {
+ if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
+ // TODO this logic is too simple, but should work most of the time
+ // Need a correct algorithm to handle all different cases
+ continue
+ }
+ }
if _, found := emptyNode.selectedVolumes[v.Id]; !found {
if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
delete(fullNode.selectedVolumes, v.Id)
@@ -230,8 +238,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyBalancing {
- ctx := context.Background()
- return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
}
return nil
}
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
new file mode 100644
index 000000000..133ec62c6
--- /dev/null
+++ b/weed/shell/command_volume_configure_replication.go
@@ -0,0 +1,104 @@
+package shell
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeConfigureReplication{})
+}
+
+type commandVolumeConfigureReplication struct {
+}
+
+func (c *commandVolumeConfigureReplication) Name() string {
+ return "volume.configure.replication"
+}
+
+func (c *commandVolumeConfigureReplication) Help() string {
+ return `change volume replication value
+
+ This command changes a volume replication value. It should be followed by volume.fix.replication.
+
+`
+}
+
+func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
+ replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
+ if err = configureReplicationCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *replicationString == "" {
+ return fmt.Errorf("empty replication value")
+ }
+
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(*replicationString)
+ if err != nil {
+ return fmt.Errorf("replication format: %v", err)
+ }
+ replicaPlacementInt32 := uint32(replicaPlacement.Byte())
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ vid := needle.VolumeId(*volumeIdInt)
+
+ // find all data nodes with volumes that needs replication change
+ var allLocations []location
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ loc := newLocation(dc, string(rack), dn)
+ for _, v := range dn.VolumeInfos {
+ if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 {
+ allLocations = append(allLocations, loc)
+ continue
+ }
+ }
+ })
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no volume needs change")
+ }
+
+ for _, dst := range allLocations {
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
+ VolumeId: uint32(vid),
+ Replication: replicaPlacement.String(),
+ })
+ if configureErr != nil {
+ return configureErr
+ }
+ if resp.Error != "" {
+ return errors.New(resp.Error)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ }
+
+ return nil
+}
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 1c83ba655..aecc071ad 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -47,7 +46,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- ctx := context.Background()
- _, err = copyVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
return
}
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
index 17d27ea3a..5869b1621 100644
--- a/weed/shell/command_volume_delete.go
+++ b/weed/shell/command_volume_delete.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -42,7 +41,6 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 4c7a794c0..210f4819d 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -3,13 +3,14 @@ package shell
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
"io"
"math/rand"
"sort"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func init() {
@@ -49,9 +50,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -78,7 +78,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
underReplicatedVolumeLocations := make(map[uint32][]location)
for vid, locations := range replicatedVolumeLocations {
volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
if replicaPlacement.GetCopyCount() > len(locations) {
underReplicatedVolumeLocations[vid] = locations
}
@@ -97,7 +97,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
for vid, locations := range underReplicatedVolumeLocations {
volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
@@ -113,7 +113,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeInfo.Id,
SourceDataNode: sourceNode.dataNode.Id,
})
@@ -145,7 +145,7 @@ func keepDataNodesSorted(dataNodes []location) {
})
}
-func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
existingDataCenters := make(map[string]bool)
existingRacks := make(map[string]bool)
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index 91b5a0d32..c5a9388fa 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -32,9 +32,8 @@ func (c *commandVolumeList) Help() string {
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -46,7 +45,7 @@ func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.
}
func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) statistics {
- fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d volumeSizeLimit:%d MB\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, volumeSizeLimitMb)
+ fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d remote:%d volumeSizeLimit:%d MB\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount, volumeSizeLimitMb)
sort.Slice(t.DataCenterInfos, func(i, j int) bool {
return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id
})
@@ -58,7 +57,7 @@ func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLi
return s
}
func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics {
- fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount)
var s statistics
sort.Slice(t.RackInfos, func(i, j int) bool {
return t.RackInfos[i].Id < t.RackInfos[j].Id
@@ -70,7 +69,7 @@ func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statisti
return s
}
func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics {
- fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount)
var s statistics
sort.Slice(t.DataNodeInfos, func(i, j int) bool {
return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id
@@ -82,7 +81,7 @@ func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics {
return s
}
func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics {
- fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount)
+ fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount)
var s statistics
sort.Slice(t.VolumeInfos, func(i, j int) bool {
return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 50a307492..cffc7136b 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -45,14 +45,13 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return mountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
-func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
return mountErr
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 08d87c988..c25b953a5 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -25,7 +25,7 @@ func (c *commandVolumeMove) Name() string {
}
func (c *commandVolumeMove) Help() string {
- return `<experimental> move a live volume from one volume server to another volume server
+ return `move a live volume from one volume server to another volume server
volume.move <source volume server host:port> <target volume server host:port> <volume id>
@@ -59,26 +59,25 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- ctx := context.Background()
- return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
-func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
+func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- lastAppendAtNs, err := copyVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- if err = tailVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
+ if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
- if err = deleteVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer); err != nil {
+ if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil {
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
}
@@ -86,10 +85,10 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI
return nil
}
-func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
+func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: sourceVolumeServer,
})
@@ -102,10 +101,10 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
return
}
-func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
+func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
+ _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
@@ -116,9 +115,9 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
}
-func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
+ _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})
return deleteErr
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
new file mode 100644
index 000000000..756dc4686
--- /dev/null
+++ b/weed/shell/command_volume_tier_download.go
@@ -0,0 +1,166 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeTierDownload{})
+}
+
+type commandVolumeTierDownload struct {
+}
+
+func (c *commandVolumeTierDownload) Name() string {
+ return "volume.tier.download"
+}
+
+func (c *commandVolumeTierDownload) Help() string {
+ return `download the dat file of a volume from a remote tier
+
+ volume.tier.download [-collection=""]
+ volume.tier.download [-collection=""] -volumeId=<volume_id>
+
+ e.g.:
+ volume.tier.download -volumeId=7
+ volume.tier.download -volumeId=7
+
+ This command will download the dat file of a volume from a remote tier to a volume server in local cluster.
+
+`
+}
+
+func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := tierCommand.Int("volumeId", 0, "the volume id")
+ collection := tierCommand.String("collection", "", "the collection name")
+ if err = tierCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // collect topology information
+ topologyInfo, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
+
+ // volumeId is provided
+ if vid != 0 {
+ return doVolumeTierDownload(commandEnv, writer, *collection, vid)
+ }
+
+ // apply to all volumes in the collection
+ // reusing collectVolumeIdsForEcEncode for now
+ volumeIds := collectRemoteVolumes(topologyInfo, *collection)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("tier download volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
+
+ vidMap := make(map[uint32]bool)
+ eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ for _, v := range dn.VolumeInfos {
+ if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" {
+ vidMap[v.Id] = true
+ }
+ }
+ })
+
+ for vid := range vidMap {
+ vids = append(vids, needle.VolumeId(vid))
+ }
+
+ return
+}
+
+func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) {
+ // find volume location
+ locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ return fmt.Errorf("volume %d not found", vid)
+ }
+
+ // TODO parallelize this
+ for _, loc := range locations {
+ // copy the .dat file from remote tier to local
+ err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
+ if err != nil {
+ return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err)
+ }
+ }
+
+ return nil
+}
+
+func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
+
+ err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ })
+
+ var lastProcessed int64
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0
+
+ fmt.Fprintf(writer, "downloaded %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed)
+
+ lastProcessed = resp.Processed
+ }
+ if downloadErr != nil {
+ return downloadErr
+ }
+
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
+ VolumeId: uint32(volumeId),
+ })
+ if unmountErr != nil {
+ return unmountErr
+ }
+
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
+ VolumeId: uint32(volumeId),
+ })
+ if mountErr != nil {
+ return mountErr
+ }
+
+ return nil
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
new file mode 100644
index 000000000..5131e8f85
--- /dev/null
+++ b/weed/shell/command_volume_tier_upload.go
@@ -0,0 +1,147 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeTierUpload{})
+}
+
+type commandVolumeTierUpload struct {
+}
+
+func (c *commandVolumeTierUpload) Name() string {
+ return "volume.tier.upload"
+}
+
+func (c *commandVolumeTierUpload) Help() string {
+ return `upload the dat file of a volume to a remote tier
+
+ volume.tier.upload [-collection=""] [-fullPercent=95] [-quietFor=1h]
+ volume.tier.upload [-collection=""] -volumeId=<volume_id> -dest=<storage_backend> [-keepLocalDatFile]
+
+ e.g.:
+ volume.tier.upload -volumeId=7 -dest=s3
+ volume.tier.upload -volumeId=7 -dest=s3.default
+
+ The <storage_backend> is defined in master.toml.
+ For example, "s3.default" in [storage.backend.s3.default]
+
+ This command will move the dat file of a volume to a remote tier.
+
+ SeaweedFS enables scalable and fast local access to lots of files,
+ and the cloud storage is slower by cost efficient. How to combine them together?
+
+ Usually the data follows 80/20 rule: only 20% of data is frequently accessed.
+ We can offload the old volumes to the cloud.
+
+ With this, SeaweedFS can be both fast and scalable, and infinite storage space.
+ Just add more local SeaweedFS volume servers to increase the throughput.
+
+ The index file is still local, and the same O(1) disk read is applied to the remote file.
+
+`
+}
+
+func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeId := tierCommand.Int("volumeId", 0, "the volume id")
+ collection := tierCommand.String("collection", "", "the collection name")
+ fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
+ quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
+ dest := tierCommand.String("dest", "", "the target tier name")
+ keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file")
+ if err = tierCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ vid := needle.VolumeId(*volumeId)
+
+ // volumeId is provided
+ if vid != 0 {
+ return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
+ }
+
+ // apply to all volumes in the collection
+ // reusing collectVolumeIdsForEcEncode for now
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
+ if err != nil {
+ return err
+ }
+ fmt.Printf("tier upload volumes: %v\n", volumeIds)
+ for _, vid := range volumeIds {
+ if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
+ // find volume location
+ locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ return fmt.Errorf("volume %d not found", vid)
+ }
+
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ if err != nil {
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
+ }
+
+ // copy the .dat file to remote tier
+ err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
+ if err != nil {
+ return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err)
+ }
+
+ return nil
+}
+
+func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
+
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ DestinationBackendName: dest,
+ KeepLocalDatFile: keepLocalDatFile,
+ })
+
+ var lastProcessed int64
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+
+ processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0
+
+ fmt.Fprintf(writer, "copied %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed)
+
+ lastProcessed = resp.Processed
+ }
+
+ return copyErr
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index 8096f34d8..6e5bef485 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -45,14 +45,13 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return unmountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
-func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
return unmountErr
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index b642ec253..b8832ad93 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
"net/url"
@@ -9,10 +8,11 @@ import (
"strconv"
"strings"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "google.golang.org/grpc"
)
type ShellOptions struct {
@@ -42,10 +42,9 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv {
return &CommandEnv{
- env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(context.Background(),
- options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")),
- option: options,
+ env: make(map[string]string),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, "shell", 0, strings.Split(*options.Masters, ",")),
+ option: options,
}
}
@@ -59,38 +58,27 @@ func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int6
return ce.option.FilerHost, ce.option.FilerPort, input, err
}
-func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
+func (ce *CommandEnv) isDirectory(filerServer string, filerPort int64, path string) bool {
- return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil
+ return ce.checkDirectory(filerServer, filerPort, path) == nil
}
-func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
+func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path string) error {
dir, name := filer2.FullPath(path).DirAndName()
- return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+ return ce.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
- resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: dir,
- Prefix: name,
- StartFromFileName: name,
- InclusiveStartFrom: true,
- Limit: 1,
+ resp, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: dir,
+ Name: name,
})
- if listErr != nil {
- return listErr
- }
-
- if len(resp.Entries) == 0 {
- return fmt.Errorf("entry not found")
- }
-
- if resp.Entries[0].Name != name {
- return fmt.Errorf("not a valid directory, found %s", resp.Entries[0].Name)
+ if lookupErr != nil {
+ return lookupErr
}
- if !resp.Entries[0].IsDirectory {
+ if !resp.Entry.IsDirectory {
return fmt.Errorf("not a directory")
}