aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/shell
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_bucket_create.go83
-rw-r--r--weed/shell/command_bucket_delete.go54
-rw-r--r--weed/shell/command_bucket_list.go78
-rw-r--r--weed/shell/command_collection_delete.go5
-rw-r--r--weed/shell/command_collection_list.go5
-rw-r--r--weed/shell/command_ec_balance.go57
-rw-r--r--weed/shell/command_ec_common.go43
-rw-r--r--weed/shell/command_ec_decode.go47
-rw-r--r--weed/shell/command_ec_encode.go56
-rw-r--r--weed/shell/command_ec_rebuild.go33
-rw-r--r--weed/shell/command_ec_test.go3
-rw-r--r--weed/shell/command_fs_cat.go19
-rw-r--r--weed/shell/command_fs_cd.go19
-rw-r--r--weed/shell/command_fs_du.go60
-rw-r--r--weed/shell/command_fs_lock_unlock.go54
-rw-r--r--weed/shell/command_fs_ls.go24
-rw-r--r--weed/shell/command_fs_meta_cat.go17
-rw-r--r--weed/shell/command_fs_meta_load.go20
-rw-r--r--weed/shell/command_fs_meta_notify.go8
-rw-r--r--weed/shell/command_fs_meta_save.go116
-rw-r--r--weed/shell/command_fs_mv.go23
-rw-r--r--weed/shell/command_fs_pwd.go6
-rw-r--r--weed/shell/command_fs_tree.go24
-rw-r--r--weed/shell/command_volume_balance.go40
-rw-r--r--weed/shell/command_volume_configure_replication.go13
-rw-r--r--weed/shell/command_volume_copy.go8
-rw-r--r--weed/shell/command_volume_delete.go8
-rw-r--r--weed/shell/command_volume_fix_replication.go151
-rw-r--r--weed/shell/command_volume_fix_replication_test.go207
-rw-r--r--weed/shell/command_volume_fsck.go361
-rw-r--r--weed/shell/command_volume_list.go5
-rw-r--r--weed/shell/command_volume_mount.go13
-rw-r--r--weed/shell/command_volume_move.go33
-rw-r--r--weed/shell/command_volume_tier_download.go25
-rw-r--r--weed/shell/command_volume_tier_upload.go23
-rw-r--r--weed/shell/command_volume_unmount.go13
-rw-r--r--weed/shell/commands.go79
-rw-r--r--weed/shell/shell_liner.go64
38 files changed, 1385 insertions, 512 deletions
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go
new file mode 100644
index 000000000..52d96e4c3
--- /dev/null
+++ b/weed/shell/command_bucket_create.go
@@ -0,0 +1,83 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketCreate{})
+}
+
+type commandBucketCreate struct {
+}
+
+func (c *commandBucketCreate) Name() string {
+ return "bucket.create"
+}
+
+func (c *commandBucketCreate) Help() string {
+ return `create a bucket with a given name
+
+ Example:
+ bucket.create -name <bucket_name> -replication 001
+`
+}
+
+func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ bucketName := bucketCommand.String("name", "", "bucket name")
+ replication := bucketCommand.String("replication", "", "replication setting for the bucket")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *bucketName == "" {
+ return fmt.Errorf("empty bucket name")
+ }
+
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer configuration: %v", err)
+ }
+ filerBucketsPath := resp.DirBuckets
+
+ println("create bucket under", filerBucketsPath)
+
+ entry := &filer_pb.Entry{
+ Name: *bucketName,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0777 | os.ModeDir),
+ Collection: *bucketName,
+ Replication: *replication,
+ },
+ }
+
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: filerBucketsPath,
+ Entry: entry,
+ }); err != nil {
+ return err
+ }
+
+ println("created bucket", *bucketName)
+
+ return nil
+
+ })
+
+ return err
+
+}
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go
new file mode 100644
index 000000000..07c2e74ac
--- /dev/null
+++ b/weed/shell/command_bucket_delete.go
@@ -0,0 +1,54 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketDelete{})
+}
+
+type commandBucketDelete struct {
+}
+
+func (c *commandBucketDelete) Name() string {
+ return "bucket.delete"
+}
+
+func (c *commandBucketDelete) Help() string {
+ return `delete a bucket by a given name
+
+ bucket.delete -name <bucket_name>
+`
+}
+
+func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ bucketName := bucketCommand.String("name", "", "bucket name")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ if *bucketName == "" {
+ return fmt.Errorf("empty bucket name")
+ }
+
+ _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ var filerBucketsPath string
+ filerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read buckets: %v", err)
+ }
+
+ return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true)
+
+}
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go
new file mode 100644
index 000000000..2e446b6b2
--- /dev/null
+++ b/weed/shell/command_bucket_list.go
@@ -0,0 +1,78 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "math"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandBucketList{})
+}
+
+type commandBucketList struct {
+}
+
+func (c *commandBucketList) Name() string {
+ return "bucket.list"
+}
+
+func (c *commandBucketList) Help() string {
+ return `list all buckets
+
+`
+}
+
+func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ var filerBucketsPath string
+ filerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read buckets: %v", err)
+ }
+
+ err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" {
+ fmt.Fprintf(writer, " %s\n", entry.Name)
+ } else {
+ fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication)
+ }
+ return nil
+ }, "", false, math.MaxUint32)
+ if err != nil {
+ return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
+ }
+
+ return err
+
+}
+
+func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer configuration: %v", err)
+ }
+ filerBucketsPath = resp.DirBuckets
+
+ return nil
+
+ })
+
+ return filerBucketsPath, err
+}
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index fbaddcd51..4b3d7f0be 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -34,9 +34,8 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ
collectionName := args[0]
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
return err
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index c4325c66f..2a114e61b 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -41,9 +41,8 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.CollectionList(ctx, &master_pb.CollectionListRequest{
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
})
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 96599372e..1ddb6a490 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"flag"
"fmt"
"io"
@@ -99,6 +98,10 @@ func (c *commandEcBalance) Help() string {
func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
@@ -107,10 +110,8 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return nil
}
- ctx := context.Background()
-
// collect all ec nodes
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, *dc)
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
if err != nil {
return err
}
@@ -138,7 +139,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
}
}
- if err := balanceEcRacks(ctx, commandEnv, racks, *applyBalancing); err != nil {
+ if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
return fmt.Errorf("balance ec racks: %v", err)
}
@@ -162,38 +163,36 @@ func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
- ctx := context.Background()
-
fmt.Printf("balanceEcVolumes %s\n", collection)
- if err := deleteDuplicatedEcShards(ctx, commandEnv, allEcNodes, collection, applyBalancing); err != nil {
+ if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
}
- if err := balanceEcShardsAcrossRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
- if err := balanceEcShardsWithinRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
return nil
}
-func deleteDuplicatedEcShards(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
+func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
// vid => []ecNode
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
// deduplicate ec shards
for vid, locations := range vidLocations {
- if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil {
+ if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
+func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
@@ -215,10 +214,10 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
duplicatedShardIds := []uint32{uint32(shardId)}
for _, ecNode := range ecNodes[1:] {
- if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
- if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
@@ -227,19 +226,19 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
return nil
}
-func balanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
// spread the ec shards evenly
for vid, locations := range vidLocations {
- if err := doBalanceEcShardsAcrossRacks(ctx, commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
+ if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
+func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
@@ -274,7 +273,7 @@ func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, c
for _, n := range racks[rackId].ecNodes {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -306,7 +305,7 @@ func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]i
return ""
}
-func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
@@ -330,7 +329,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
- if err := doBalanceEcShardsWithinOneRack(ctx, commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
+ if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
return err
}
}
@@ -338,7 +337,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all
return nil
}
-func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
for _, ecNode := range existingLocations {
@@ -353,7 +352,7 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv,
fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -365,18 +364,18 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv,
return nil
}
-func balanceEcRacks(ctx context.Context, commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
+func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
// balance one rack for all ec shards
for _, ecRack := range racks {
- if err := doBalanceEcRack(ctx, commandEnv, ecRack, applyBalancing); err != nil {
+ if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
+func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
if len(ecRack.ecNodes) <= 1 {
return nil
@@ -421,7 +420,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
- err := moveMountedShardToEcNode(ctx, commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
+ err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
if err != nil {
return err
}
@@ -440,7 +439,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack
return nil
}
-func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes)
@@ -458,7 +457,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, a
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
- err := moveMountedShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
+ err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
if err != nil {
return err
}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index e187d5a3b..0db119d3c 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -15,26 +15,26 @@ import (
"google.golang.org/grpc"
)
-func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
copiedShardIds := []uint32{uint32(shardId)}
if applyBalancing {
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
@@ -50,18 +50,18 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist
}
-func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
+func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetServer.info.Id != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -76,7 +76,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
}
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -178,12 +178,12 @@ type EcRack struct {
freeEcSlot int
}
-func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -211,13 +211,12 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCen
return
}
-func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeDeletedShardIds,
@@ -227,13 +226,12 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt
}
-func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
})
@@ -241,13 +239,12 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
})
}
-func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeMountedhardIds,
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 8a705a5ae..5f03df58c 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -36,6 +36,10 @@ func (c *commandEcDecode) Help() string {
func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
collection := encodeCommand.String("collection", "", "the collection name")
@@ -43,25 +47,24 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// collect topology information
- topologyInfo, err := collectTopologyInfo(ctx, commandEnv)
+ topologyInfo, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
// volumeId is provided
if vid != 0 {
- return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid)
+ return doEcDecode(commandEnv, topologyInfo, *collection, vid)
}
// apply to all volumes in the collection
volumeIds := collectEcShardIds(topologyInfo, *collection)
fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil {
+ if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
return err
}
}
@@ -69,26 +72,26 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
+func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
// find volume location
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
// collect ec shards to the server with most space
- targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid)
+ targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
if err != nil {
return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
}
// generate a normal volume
- err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
+ err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
if err != nil {
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
}
// delete the previous ec shards
- err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
+ err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
if err != nil {
return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
}
@@ -96,11 +99,11 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb
return nil
}
-func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
+func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
- if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+ if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(vid),
})
return mountErr
@@ -111,7 +114,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO
// unmount ec shards
for location, ecIndexBits := range nodeToEcIndexBits {
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
+ err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
if err != nil {
return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
}
@@ -119,7 +122,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO
// delete ec shards
for location, ecIndexBits := range nodeToEcIndexBits {
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
+ err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
if err != nil {
return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
}
@@ -128,12 +131,12 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO
return nil
}
-func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: uint32(vid),
Collection: collection,
})
@@ -144,7 +147,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v
}
-func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
+func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
maxShardCount := 0
var exisitngEcIndexBits erasure_coding.ShardBits
@@ -170,11 +173,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB
continue
}
- err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(vid),
Collection: collection,
ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
@@ -204,11 +207,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB
}
-func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
+func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 587b59388..165809d05 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -54,6 +54,10 @@ func (c *commandEcEncode) Help() string {
func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
collection := encodeCommand.String("collection", "", "the collection name")
@@ -63,22 +67,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// volumeId is provided
if vid != 0 {
- return doEcEncode(ctx, commandEnv, *collection, vid)
+ return doEcEncode(commandEnv, *collection, vid)
}
// apply to all volumes in the collection
- volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
+ if err = doEcEncode(commandEnv, *collection, vid); err != nil {
return err
}
}
@@ -86,7 +89,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@@ -96,19 +99,19 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
- err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// generate ec shards
- err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
+ err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
// balance the ec shards to current cluster
- err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
+ err = spreadEcShards(commandEnv, vid, collection, locations)
if err != nil {
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
}
@@ -116,12 +119,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
return nil
}
-func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
+func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
for _, location := range locations {
- err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
+ err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
})
return markErr
@@ -136,10 +139,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol
return nil
}
-func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -150,9 +153,9 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum
}
-func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
+func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "")
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -169,26 +172,27 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
// ask the data nodes to copy from the source volume server
- copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
+ copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return err
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
- err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
+ fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
+ err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
@@ -198,9 +202,7 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
}
-func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
- targetServers []*EcNode, allocatedEcIds [][]uint32,
- volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
+func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
// parallelize
shardIdChan := make(chan []uint32, len(targetServers))
@@ -213,7 +215,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
wg.Add(1)
go func(server *EcNode, allocatedEcShardIds []uint32) {
defer wg.Done()
- copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
+ copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
allocatedEcShardIds, volumeId, collection, existingLocation.Url)
if copyErr != nil {
err = copyErr
@@ -255,11 +257,11 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
return allocated
}
-func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 600a8cb45..df28681fe 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -56,6 +56,10 @@ func (c *commandEcRebuild) Help() string {
func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
fixCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := fixCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
applyChanges := fixCommand.Bool("force", false, "apply the changes")
@@ -64,7 +68,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
}
// collect all ec nodes
- allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv, "")
+ allEcNodes, _, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -92,8 +96,6 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error {
- ctx := context.Background()
-
fmt.Printf("rebuildEcVolumes %s\n", collection)
// collect vid => each shard locations, similar to ecShardMap in topology.go
@@ -117,7 +119,7 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
return fmt.Errorf("disk space is not enough")
}
- if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
+ if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
return err
}
}
@@ -125,13 +127,13 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
return nil
}
-func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
+func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
var generatedShardIds []uint32
- copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
+ copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
if err != nil {
return err
}
@@ -139,7 +141,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
// clean up working files
// ask the rebuilder to delete the copied shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
if err != nil {
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
}
@@ -151,13 +153,13 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
}
// generate ec shards, and maybe ecx file
- generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
+ generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
if err != nil {
return err
}
// mount the generated shards
- err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
+ err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
if err != nil {
return err
}
@@ -167,11 +169,10 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
return nil
}
-func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
+func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
- err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{
+ err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -183,7 +184,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
return
}
-func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
+func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
needEcxFile := true
var localShardBits erasure_coding.ShardBits
@@ -209,8 +210,8 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder
var copyErr error
if applyBalancing {
- copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: []uint32{uint32(shardId)},
diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go
index c233d25d0..4fddcbea5 100644
--- a/weed/shell/command_ec_test.go
+++ b/weed/shell/command_ec_test.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"testing"
@@ -121,7 +120,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
racks := collectRacks(allEcNodes)
balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
- balanceEcRacks(context.Background(), nil, racks, false)
+ balanceEcRacks(nil, racks, false)
}
func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index 238dee7f9..7177d8ac3 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -1,13 +1,13 @@
package shell
import (
- "context"
"fmt"
"io"
"math"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -25,39 +25,34 @@ func (c *commandFsCat) Help() string {
return `stream the file content on to the screen
fs.cat /dir/file_name
- fs.cat http://<filer_server>:<port>/dir/file_name
`
}
func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- input := findInputDirectory(args)
-
- filerServer, filerPort, path, err := commandEnv.parseUrl(input)
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(path) {
return fmt.Errorf("%s is a directory", path)
}
- dir, name := filer2.FullPath(path).DirAndName()
+ dir, name := util.FullPath(path).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
- respLookupEntry, err := client.LookupDirectoryEntry(ctx, request)
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
if err != nil {
return err
}
- return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32)
+ return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
})
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
index 408ec86c8..2cc28f7a2 100644
--- a/weed/shell/command_fs_cd.go
+++ b/weed/shell/command_fs_cd.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"io"
)
@@ -17,41 +16,33 @@ func (c *commandFsCd) Name() string {
}
func (c *commandFsCd) Help() string {
- return `change directory to http://<filer_server>:<port>/dir/
+ return `change directory to a directory /path/to/dir
The full path can be too long to type. For example,
- fs.ls http://<filer_server>:<port>/some/path/to/file_name
+ fs.ls /some/path/to/file_name
can be simplified as
- fs.cd http://<filer_server>:<port>/some/path
+ fs.cd /some/path
fs.ls to/file_name
`
}
func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- input := findInputDirectory(args)
-
- filerServer, filerPort, path, err := commandEnv.parseUrl(input)
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
if path == "/" {
- commandEnv.option.FilerHost = filerServer
- commandEnv.option.FilerPort = filerPort
commandEnv.option.Directory = "/"
return nil
}
- ctx := context.Background()
-
- err = commandEnv.checkDirectory(ctx, filerServer, filerPort, path)
+ err = commandEnv.checkDirectory(path)
if err == nil {
- commandEnv.option.FilerHost = filerServer
- commandEnv.option.FilerPort = filerPort
commandEnv.option.Directory = path
}
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index d6ea51d0c..96551dd5a 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -1,12 +1,9 @@
package shell
import (
- "context"
"fmt"
"io"
- "google.golang.org/grpc"
-
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -26,28 +23,26 @@ func (c *commandFsDu) Name() string {
func (c *commandFsDu) Help() string {
return `show disk usage
- fs.du http://<filer_server>:<port>/dir
- fs.du http://<filer_server>:<port>/dir/file_name
- fs.du http://<filer_server>:<port>/dir/file_prefix
+ fs.du /dir
+ fs.du /dir/file_name
+ fs.du /dir/file_prefix
`
}
func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(path) {
path = path + "/"
}
var blockCount, byteCount uint64
- dir, name := filer2.FullPath(path).DirAndName()
- blockCount, byteCount, err = duTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name)
+ dir, name := util.FullPath(path).DirAndName()
+ blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv, dir, name)
if name == "" && err == nil {
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
@@ -57,54 +52,33 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
}
-func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount uint64, byteCount uint64, err error) {
+func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) {
+
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
+
+ var fileBlockCount, fileByteCount uint64
- err = filer2.ReadDirAllEntries(ctx, filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
if entry.IsDirectory {
subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
if dir == "/" {
subDir = "/" + entry.Name
}
- numBlock, numByte, err := duTraverseDirectory(ctx, writer, filerClient, subDir, "")
+ numBlock, numByte, err := duTraverseDirectory(writer, filerClient, subDir, "")
if err == nil {
blockCount += numBlock
byteCount += numByte
}
} else {
+ fileBlockCount = uint64(len(entry.Chunks))
+ fileByteCount = filer2.TotalSize(entry.Chunks)
blockCount += uint64(len(entry.Chunks))
byteCount += filer2.TotalSize(entry.Chunks)
}
if name != "" && !entry.IsDirectory {
- fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name)
+ fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name)
}
+ return nil
})
return
}
-
-func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
-
- filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
- return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(ctx2, client)
- }, filerGrpcAddress, env.option.GrpcDialOption)
-
-}
-
-type commandFilerClient struct {
- env *CommandEnv
- filerServer string
- filerPort int64
-}
-
-func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *commandFilerClient {
- return &commandFilerClient{
- env: env,
- filerServer: filerServer,
- filerPort: filerPort,
- }
-}
-func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
- return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn)
-}
diff --git a/weed/shell/command_fs_lock_unlock.go b/weed/shell/command_fs_lock_unlock.go
new file mode 100644
index 000000000..8a6e8f71b
--- /dev/null
+++ b/weed/shell/command_fs_lock_unlock.go
@@ -0,0 +1,54 @@
+package shell
+
+import (
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandUnlock{})
+ Commands = append(Commands, &commandLock{})
+}
+
+// =========== Lock ==============
+type commandLock struct {
+}
+
+func (c *commandLock) Name() string {
+ return "lock"
+}
+
+func (c *commandLock) Help() string {
+ return `lock in order to exclusively manage the cluster
+
+ This is a blocking operation if there is alread another lock.
+`
+}
+
+func (c *commandLock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ commandEnv.locker.RequestLock()
+
+ return nil
+}
+
+// =========== Unlock ==============
+
+type commandUnlock struct {
+}
+
+func (c *commandUnlock) Name() string {
+ return "unlock"
+}
+
+func (c *commandUnlock) Help() string {
+ return `unlock the cluster-wide lock
+
+`
+}
+
+func (c *commandUnlock) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ commandEnv.locker.ReleaseLock()
+
+ return nil
+}
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 0c63f71fa..36133992f 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
"os"
@@ -11,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -30,9 +30,6 @@ func (c *commandFsLs) Help() string {
fs.ls [-l] [-a] /dir/
fs.ls [-l] [-a] /dir/file_name
fs.ls [-l] [-a] /dir/file_prefix
- fs.ls [-l] [-a] http://<filer_server>:<port>/dir/
- fs.ls [-l] [-a] http://<filer_server>:<port>/dir/file_name
- fs.ls [-l] [-a] http://<filer_server>:<port>/dir/file_prefix
`
}
@@ -53,26 +50,22 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
}
}
- input := findInputDirectory(args)
-
- filerServer, filerPort, path, err := commandEnv.parseUrl(input)
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(path) {
path = path + "/"
}
- dir, name := filer2.FullPath(path).DirAndName()
+ dir, name := util.FullPath(path).DirAndName()
entryCount := 0
- err = filer2.ReadDirAllEntries(ctx, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
if !showHidden && strings.HasPrefix(entry.Name, ".") {
- return
+ return nil
}
entryCount++
@@ -95,9 +88,9 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
}
}
- if dir == "/" {
+ if strings.HasSuffix(dir, "/") {
// just for printing
- dir = ""
+ dir = dir[:len(dir)-1]
}
fmt.Fprintf(writer, "%s %3d %s %s %6d %s/%s\n",
fileMode, len(entry.Chunks),
@@ -107,6 +100,7 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
fmt.Fprintf(writer, "%s\n", entry.Name)
}
+ return nil
})
if isLongFormat && err == nil {
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index 9980f67a2..0679ec075 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -1,14 +1,13 @@
package shell
import (
- "context"
"fmt"
"io"
"github.com/golang/protobuf/jsonpb"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -27,31 +26,25 @@ func (c *commandFsMetaCat) Help() string {
fs.meta.cat /dir/
fs.meta.cat /dir/file_name
- fs.meta.cat http://<filer_server>:<port>/dir/
- fs.meta.cat http://<filer_server>:<port>/dir/file_name
`
}
func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- input := findInputDirectory(args)
-
- filerServer, filerPort, path, err := commandEnv.parseUrl(input)
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- ctx := context.Background()
-
- dir, name := filer2.FullPath(path).DirAndName()
+ dir, name := util.FullPath(path).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
- respLookupEntry, err := client.LookupDirectoryEntry(ctx, request)
+ respLookupEntry, err := filer_pb.LookupEntry(client, request)
if err != nil {
return err
}
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 8f2ef95e3..69ae9454c 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -1,15 +1,14 @@
package shell
import (
- "context"
"fmt"
"io"
"os"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -38,11 +37,6 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
- filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(nil))
- if err != nil {
- return err
- }
-
fileName := args[len(args)-1]
dst, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
@@ -53,9 +47,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- ctx := context.Background()
-
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
@@ -80,14 +72,14 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
- if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: fullEntry.Dir,
Entry: fullEntry.Entry,
}); err != nil {
return err
}
- fmt.Fprintf(writer, "load %s\n", filer2.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
+ fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
if fullEntry.Entry.IsDirectory {
dirCount++
@@ -101,7 +93,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
if err == nil {
fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount)
- fmt.Fprintf(writer, "\n%s is loaded to http://%s:%d%s\n", fileName, filerServer, filerPort, path)
+ fmt.Fprintf(writer, "\n%s is loaded.\n", fileName)
}
return err
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
index e2b2d22cc..4342fa81d 100644
--- a/weed/shell/command_fs_meta_notify.go
+++ b/weed/shell/command_fs_meta_notify.go
@@ -1,11 +1,9 @@
package shell
import (
- "context"
"fmt"
"io"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/notification"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -34,7 +32,7 @@ func (c *commandFsMetaNotify) Help() string {
func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
@@ -43,11 +41,9 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
v := util.GetViper()
notification.LoadConfiguration(v, "notification.")
- ctx := context.Background()
-
var dirCount, fileCount uint64
- err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) {
+ err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
if entry.IsDirectory {
dirCount++
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index 178c826d5..ed19e3d01 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"flag"
"fmt"
"io"
@@ -12,7 +11,6 @@ import (
"github.com/golang/protobuf/proto"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -40,8 +38,6 @@ func (c *commandFsMetaSave) Help() string {
The meta data will be saved into a local <filer_host>-<port>-<time>.meta file.
These meta data can be later loaded by fs.meta.load command,
- This assumes there are no deletions, so this is different from taking a snapshot.
-
`
}
@@ -50,22 +46,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
verbose := fsMetaSaveCommand.Bool("v", false, "print out each processed files")
outputFileName := fsMetaSaveCommand.String("o", "", "output the meta data to this file")
+ // chunksFileName := fsMetaSaveCommand.String("chunks", "", "output all the chunks to this file")
if err = fsMetaSaveCommand.Parse(args); err != nil {
return nil
}
- filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args()))
+ path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args()))
if parseErr != nil {
return parseErr
}
- ctx := context.Background()
-
- t := time.Now()
fileName := *outputFileName
if fileName == "" {
+ t := time.Now()
fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta",
- filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
+ commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second())
}
dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
@@ -74,43 +69,64 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
}
defer dst.Close()
- var wg sync.WaitGroup
- wg.Add(1)
- outputChan := make(chan []byte, 1024)
- go func() {
+ err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) {
sizeBuf := make([]byte, 4)
- for b := range outputChan {
+ for item := range outputChan {
+ b := item.([]byte)
util.Uint32toBytes(sizeBuf, uint32(len(b)))
dst.Write(sizeBuf)
dst.Write(b)
}
+ }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ bytes, err := proto.Marshal(entry)
+ if err != nil {
+ fmt.Fprintf(writer, "marshall error: %v\n", err)
+ return
+ }
+
+ outputChan <- bytes
+ return nil
+ })
+
+ if err == nil {
+ fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName)
+ }
+
+ return err
+
+}
+
+func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error {
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ outputChan := make(chan interface{}, 1024)
+ go func() {
+ saveFn(outputChan)
wg.Done()
}()
var dirCount, fileCount uint64
- err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) {
+ err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
protoMessage := &filer_pb.FullEntry{
Dir: string(parentPath),
Entry: entry,
}
- bytes, err := proto.Marshal(protoMessage)
- if err != nil {
+ if err := genFn(protoMessage, outputChan); err != nil {
fmt.Fprintf(writer, "marshall error: %v\n", err)
return
}
- outputChan <- bytes
-
if entry.IsDirectory {
atomic.AddUint64(&dirCount, 1)
} else {
atomic.AddUint64(&fileCount, 1)
}
- if *verbose {
+ if verbose {
println(parentPath.Child(entry.Name))
}
@@ -120,66 +136,8 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
wg.Wait()
- if err == nil {
+ if err == nil && writer != nil {
fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)
- fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName)
}
-
return err
-
-}
-func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient,
- parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) {
-
- K := 5
-
- var jobQueueWg sync.WaitGroup
- queue := util.NewQueue()
- jobQueueWg.Add(1)
- queue.Enqueue(parentPath)
- var isTerminating bool
-
- for i := 0; i < K; i++ {
- go func() {
- for {
- if isTerminating {
- break
- }
- t := queue.Dequeue()
- if t == nil {
- time.Sleep(329 * time.Millisecond)
- continue
- }
- dir := t.(filer2.FullPath)
- processErr := processOneDirectory(ctx, writer, filerClient, dir, queue, &jobQueueWg, fn)
- if processErr != nil {
- err = processErr
- }
- jobQueueWg.Done()
- }
- }()
- }
- jobQueueWg.Wait()
- isTerminating = true
- return
-}
-
-func processOneDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient,
- parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup,
- fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) {
-
- return filer2.ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) {
-
- fn(parentPath, entry)
-
- if entry.IsDirectory {
- subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
- if parentPath == "/" {
- subDir = "/" + entry.Name
- }
- jobQueueWg.Add(1)
- queue.Enqueue(filer2.FullPath(subDir))
- }
- })
-
}
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index e77755921..0a7eed02d 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -4,10 +4,9 @@ import (
"context"
"fmt"
"io"
- "path/filepath"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -37,37 +36,35 @@ func (c *commandFsMv) Help() string {
func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- filerServer, filerPort, sourcePath, err := commandEnv.parseUrl(args[0])
+ sourcePath, err := commandEnv.parseUrl(args[0])
if err != nil {
return err
}
- _, _, destinationPath, err := commandEnv.parseUrl(args[1])
+ destinationPath, err := commandEnv.parseUrl(args[1])
if err != nil {
return err
}
- ctx := context.Background()
+ sourceDir, sourceName := util.FullPath(sourcePath).DirAndName()
- sourceDir, sourceName := filer2.FullPath(sourcePath).DirAndName()
+ destinationDir, destinationName := util.FullPath(destinationPath).DirAndName()
- destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName()
-
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// collect destination entry info
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
Name: destinationDir,
Directory: destinationName,
}
- respDestinationLookupEntry, err := client.LookupDirectoryEntry(ctx, destinationRequest)
+ respDestinationLookupEntry, err := filer_pb.LookupEntry(client, destinationRequest)
var targetDir, targetName string
// moving a file or folder
if err == nil && respDestinationLookupEntry.Entry.IsDirectory {
// to a directory
- targetDir = filepath.ToSlash(filepath.Join(destinationDir, destinationName))
+ targetDir = util.Join(destinationDir, destinationName)
targetName = sourceName
} else {
// to a file or folder
@@ -82,9 +79,9 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
NewName: targetName,
}
- _, err = client.AtomicRenameEntry(ctx, request)
+ _, err = client.AtomicRenameEntry(context.Background(), request)
- fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, filer2.NewFullPath(targetDir, targetName))
+ fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, util.NewFullPath(targetDir, targetName))
return err
diff --git a/weed/shell/command_fs_pwd.go b/weed/shell/command_fs_pwd.go
index 084a5e90a..d7d9819c8 100644
--- a/weed/shell/command_fs_pwd.go
+++ b/weed/shell/command_fs_pwd.go
@@ -22,11 +22,7 @@ func (c *commandFsPwd) Help() string {
func (c *commandFsPwd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- fmt.Fprintf(writer, "http://%s:%d%s\n",
- commandEnv.option.FilerHost,
- commandEnv.option.FilerPort,
- commandEnv.option.Directory,
- )
+ fmt.Fprintf(writer, "%s\n", commandEnv.option.Directory)
return nil
}
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index 8660030e3..a8c5b2018 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -1,13 +1,12 @@
package shell
import (
- "context"
"fmt"
"io"
"strings"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -24,22 +23,21 @@ func (c *commandFsTree) Name() string {
func (c *commandFsTree) Help() string {
return `recursively list all files under a directory
- fs.tree http://<filer_server>:<port>/dir/
+ fs.tree /some/dir
+
`
}
func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
+ path, err := commandEnv.parseUrl(findInputDirectory(args))
if err != nil {
return err
}
- dir, name := filer2.FullPath(path).DirAndName()
-
- ctx := context.Background()
+ dir, name := util.FullPath(path).DirAndName()
- dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, newPrefix(), -1)
+ dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv, util.FullPath(dir), name, newPrefix(), -1)
if terr == nil {
fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount)
@@ -49,14 +47,14 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ
}
-func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir filer2.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) {
+func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir util.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) {
prefix.addMarker(level)
- err = filer2.ReadDirAllEntries(ctx, filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) error {
if level < 0 && name != "" {
if entry.Name != name {
- return
+ return nil
}
}
@@ -65,14 +63,14 @@ func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient fi
if entry.IsDirectory {
directoryCount++
subDir := dir.Child(entry.Name)
- dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, filerClient, subDir, "", prefix, level+1)
+ dirCount, fCount, terr := treeTraverseDirectory(writer, filerClient, subDir, "", prefix, level+1)
directoryCount += dirCount
fileCount += fCount
err = terr
} else {
fileCount++
}
-
+ return nil
})
return
}
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index bed4f4306..69e3c7fd9 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -60,6 +60,10 @@ func (c *commandVolumeBalance) Help() string {
func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or use \"ALL_COLLECTIONS\" across collections, \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
@@ -69,9 +73,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -109,14 +112,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
-func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
-
- var nodes []*Node
- for _, dn := range dataNodeInfos {
- nodes = append(nodes, &Node{
- info: dn,
- })
- }
+func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
// balance writable volumes
for _, n := range nodes {
@@ -151,15 +147,19 @@ func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.Dat
return nil
}
-func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*master_pb.DataNodeInfo) {
- typeToNodes = make(map[uint64][]*master_pb.DataNodeInfo)
+func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) {
+ typeToNodes = make(map[uint64][]*Node)
for _, dc := range t.DataCenterInfos {
if selectedDataCenter != "" && dc.Id != selectedDataCenter {
continue
}
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
- typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], dn)
+ typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{
+ info: dn,
+ dc: dc.Id,
+ rack: r.Id,
+ })
}
}
}
@@ -169,6 +169,8 @@ func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter st
type Node struct {
info *master_pb.DataNodeInfo
selectedVolumes map[uint32]*master_pb.VolumeInformationMessage
+ dc string
+ rack string
}
func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) {
@@ -210,6 +212,13 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates
sortCandidatesFn(candidateVolumes)
for _, v := range candidateVolumes {
+ if v.ReplicaPlacement > 0 {
+ if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack {
+ // TODO this logic is too simple, but should work most of the time
+ // Need a correct algorithm to handle all different cases
+ continue
+ }
+ }
if _, found := emptyNode.selectedVolumes[v.Id]; !found {
if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil {
delete(fullNode.selectedVolumes, v.Id)
@@ -233,8 +242,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyBalancing {
- ctx := context.Background()
- return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
}
return nil
}
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index 6000d0de0..ff976c345 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -35,6 +35,10 @@ func (c *commandVolumeConfigureReplication) Help() string {
func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
configureReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeIdInt := configureReplicationCommand.Int("volumeId", 0, "the volume id")
replicationString := configureReplicationCommand.String("replication", "", "the intended replication value")
@@ -53,9 +57,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -81,8 +84,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, configureErr := volumeServerClient.VolumeConfigure(ctx, &volume_server_pb.VolumeConfigureRequest{
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
})
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 1c83ba655..cdd10863f 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -32,6 +31,10 @@ func (c *commandVolumeCopy) Help() string {
func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
if len(args) != 3 {
fmt.Fprintf(writer, "received args: %+v\n", args)
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
@@ -47,7 +50,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- ctx := context.Background()
- _, err = copyVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
return
}
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
index 17d27ea3a..c5cc9e277 100644
--- a/weed/shell/command_volume_delete.go
+++ b/weed/shell/command_volume_delete.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -31,6 +30,10 @@ func (c *commandVolumeDelete) Help() string {
func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
if len(args) != 2 {
fmt.Fprintf(writer, "received args: %+v\n", args)
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
@@ -42,7 +45,6 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 7a1a77cbe..19da89b67 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -44,15 +44,18 @@ func (c *commandVolumeFixReplication) Help() string {
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
takeAction := true
if len(args) > 0 && args[0] == "-n" {
takeAction = false
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -113,12 +116,12 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
break
}
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeInfo.Id,
SourceDataNode: sourceNode.dataNode.Id,
})
- return replicateErr
+ return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
})
if err != nil {
@@ -146,31 +149,131 @@ func keepDataNodesSorted(dataNodes []location) {
})
}
+/*
+ if on an existing data node {
+ return false
+ }
+ if different from existing dcs {
+ if lack on different dcs {
+ return true
+ }else{
+ return false
+ }
+ }
+ if not on primary dc {
+ return false
+ }
+ if different from existing racks {
+ if lack on different racks {
+ return true
+ }else{
+ return false
+ }
+ }
+ if not on primary rack {
+ return false
+ }
+ if lacks on same rack {
+ return true
+ } else {
+ return false
+ }
+*/
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
- existingDataCenters := make(map[string]bool)
- existingRacks := make(map[string]bool)
- existingDataNodes := make(map[string]bool)
+ existingDataNodes := make(map[string]int)
for _, loc := range existingLocations {
- existingDataCenters[loc.DataCenter()] = true
- existingRacks[loc.Rack()] = true
- existingDataNodes[loc.String()] = true
+ existingDataNodes[loc.String()] += 1
+ }
+ sameDataNodeCount := existingDataNodes[possibleLocation.String()]
+ // avoid duplicated volume on the same data node
+ if sameDataNodeCount > 0 {
+ return false
}
- if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
- // check dc, good if different from any existing data centers
- _, found := existingDataCenters[possibleLocation.DataCenter()]
- return !found
- } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
- // check rack, good if different from any existing racks
- _, found := existingRacks[possibleLocation.Rack()]
- return !found
- } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
- // check data node, good if different from any existing data nodes
- _, found := existingDataNodes[possibleLocation.String()]
- return !found
+ existingDataCenters := make(map[string]int)
+ for _, loc := range existingLocations {
+ existingDataCenters[loc.DataCenter()] += 1
+ }
+ primaryDataCenters, _ := findTopKeys(existingDataCenters)
+
+ // ensure data center count is within limit
+ if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
+ // different from existing dcs
+ if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
+ // lack on different dcs
+ return true
+ } else {
+ // adding this would go over the different dcs limit
+ return false
+ }
+ }
+ // now this is same as one of the existing data center
+ if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
+ // not on one of the primary dcs
+ return false
}
+ // now this is one of the primary dcs
+ existingRacks := make(map[string]int)
+ for _, loc := range existingLocations {
+ if loc.DataCenter() != possibleLocation.DataCenter() {
+ continue
+ }
+ existingRacks[loc.Rack()] += 1
+ }
+ primaryRacks, _ := findTopKeys(existingRacks)
+ sameRackCount := existingRacks[possibleLocation.Rack()]
+
+ // ensure rack count is within limit
+ if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ // different from existing racks
+ if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ // lack on different racks
+ return true
+ } else {
+ // adding this would go over the different racks limit
+ return false
+ }
+ }
+ // now this is same as one of the existing racks
+ if !isAmong(possibleLocation.Rack(), primaryRacks) {
+ // not on the primary rack
+ return false
+ }
+
+ // now this is on the primary rack
+
+ // different from existing data nodes
+ if sameRackCount < replicaPlacement.SameRackCount+1 {
+ // lack on same rack
+ return true
+ } else {
+ // adding this would go over the same data node limit
+ return false
+ }
+
+}
+
+func findTopKeys(m map[string]int) (topKeys []string, max int) {
+ for k, c := range m {
+ if max < c {
+ topKeys = topKeys[:0]
+ topKeys = append(topKeys, k)
+ max = c
+ } else if max == c {
+ topKeys = append(topKeys, k)
+ }
+ }
+ return
+}
+
+func isAmong(key string, keys []string) bool {
+ for _, k := range keys {
+ if k == key {
+ return true
+ }
+ }
return false
}
diff --git a/weed/shell/command_volume_fix_replication_test.go b/weed/shell/command_volume_fix_replication_test.go
new file mode 100644
index 000000000..4cfbd96aa
--- /dev/null
+++ b/weed/shell/command_volume_fix_replication_test.go
@@ -0,0 +1,207 @@
+package shell
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+type testcase struct {
+ name string
+ replication string
+ existingLocations []location
+ possibleLocation location
+ expected bool
+}
+
+func TestSatisfyReplicaPlacementComplicated(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 100 negative",
+ replication: "100",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: false,
+ },
+ {
+ name: "test 100 positive",
+ replication: "100",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ possibleLocation: location{"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: true,
+ },
+ {
+ name: "test 022 positive",
+ replication: "022",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: true,
+ },
+ {
+ name: "test 022 negative",
+ replication: "022",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ {
+ name: "test 210 moved from 200 positive",
+ replication: "210",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: true,
+ },
+ {
+ name: "test 210 moved from 200 negative extra dc",
+ replication: "210",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc4", "r4", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ {
+ name: "test 210 moved from 200 negative extra data node",
+ replication: "210",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc2", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc3", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ }
+
+ runTests(tests, t)
+
+}
+
+func TestSatisfyReplicaPlacement01x(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 011 same existing rack",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ {
+ name: "test 011 negative",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: false,
+ },
+ {
+ name: "test 011 different existing racks",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ {
+ name: "test 011 different existing racks negative",
+ replication: "011",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r2", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r3", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: false,
+ },
+ }
+
+ runTests(tests, t)
+
+}
+
+func TestSatisfyReplicaPlacement00x(t *testing.T) {
+
+ var tests = []testcase{
+ {
+ name: "test 001",
+ replication: "001",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: true,
+ },
+ {
+ name: "test 002 positive",
+ replication: "002",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ expected: true,
+ },
+ {
+ name: "test 002 negative, repeat the same node",
+ replication: "002",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ expected: false,
+ },
+ {
+ name: "test 002 negative, enough node already",
+ replication: "002",
+ existingLocations: []location{
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn1"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn2"}},
+ {"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn3"}},
+ },
+ possibleLocation: location{"dc1", "r1", &master_pb.DataNodeInfo{Id: "dn4"}},
+ expected: false,
+ },
+ }
+
+ runTests(tests, t)
+
+}
+
+func runTests(tests []testcase, t *testing.T) {
+ for _, tt := range tests {
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromString(tt.replication)
+ println("replication:", tt.replication, "expected", tt.expected, "name:", tt.name)
+ if satisfyReplicaPlacement(replicaPlacement, tt.existingLocations, tt.possibleLocation) != tt.expected {
+ t.Errorf("%s: expect %v add %v to %s %+v",
+ tt.name, tt.expected, tt.possibleLocation, tt.replication, tt.existingLocations)
+ }
+ }
+}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
new file mode 100644
index 000000000..69a1a63b4
--- /dev/null
+++ b/weed/shell/command_volume_fsck.go
@@ -0,0 +1,361 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math"
+ "os"
+ "path/filepath"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeFsck{})
+}
+
+type commandVolumeFsck struct {
+ env *CommandEnv
+}
+
+func (c *commandVolumeFsck) Name() string {
+ return "volume.fsck"
+}
+
+func (c *commandVolumeFsck) Help() string {
+ return `check all volumes to find entries not used by the filer
+
+ Important assumption!!!
+ the system is all used by one filer.
+
+ This command works this way:
+ 1. collect all file ids from all volumes, as set A
+ 2. collect all file ids from the filer, as set B
+ 3. find out the set A subtract B
+
+`
+}
+
+func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ verbose := fsckCommand.Bool("v", false, "verbose mode")
+ applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
+ if err = fsckCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ c.env = commandEnv
+
+ // create a temp folder
+ tempFolder, err := ioutil.TempDir("", "sw_fsck")
+ if err != nil {
+ return fmt.Errorf("failed to create temp folder: %v", err)
+ }
+ if *verbose {
+ fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
+ }
+ defer os.RemoveAll(tempFolder)
+
+ // collect all volume id locations
+ volumeIdToVInfo, err := c.collectVolumeIds(*verbose, writer)
+ if err != nil {
+ return fmt.Errorf("failed to collect all volume locations: %v", err)
+ }
+
+ // collect each volume file ids
+ for volumeId, vinfo := range volumeIdToVInfo {
+ err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
+ if err != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
+ }
+ }
+
+ // collect all filer file ids
+ if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
+ return fmt.Errorf("failed to collect file ids from filer: %v", err)
+ }
+
+ // volume file ids substract filer file ids
+ var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
+ for volumeId, vinfo := range volumeIdToVInfo {
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose)
+ if checkErr != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
+ }
+ totalInUseCount += inUseCount
+ totalOrphanChunkCount += uint64(len(orphanFileIds))
+ totalOrphanDataSize += orphanDataSize
+
+ if *applyPurging && len(orphanFileIds) > 0 {
+ if vinfo.isEcVolume {
+ fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n")
+ }
+ if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
+ return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
+ }
+ }
+ }
+
+ if totalOrphanChunkCount == 0 {
+ fmt.Fprintf(writer, "no orphan data\n")
+ return nil
+ }
+
+ if !*applyPurging {
+ pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
+ fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
+ totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
+
+ fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
+ }
+
+ return nil
+}
+
+func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
+
+ if verbose {
+ fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
+ }
+
+ return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+
+ ext := ".idx"
+ if vinfo.isEcVolume {
+ ext = ".ecx"
+ }
+
+ copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: volumeId,
+ Ext: ext,
+ CompactionRevision: math.MaxUint32,
+ StopOffset: math.MaxInt64,
+ Collection: vinfo.collection,
+ IsEcVolume: vinfo.isEcVolume,
+ IgnoreSourceFileNotFound: false,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
+ }
+
+ err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
+ if err != nil {
+ return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err)
+ }
+
+ return nil
+
+ })
+
+}
+
+func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error {
+
+ if verbose {
+ fmt.Fprintf(writer, "collecting file ids from filer ...\n")
+ }
+
+ files := make(map[uint32]*os.File)
+ for vid := range volumeIdToServer {
+ dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
+ }
+ files[vid] = dst
+ }
+ defer func() {
+ for _, f := range files {
+ f.Close()
+ }
+ }()
+
+ type Item struct {
+ vid uint32
+ fileKey uint64
+ }
+ return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
+ buffer := make([]byte, 8)
+ for item := range outputChan {
+ i := item.(*Item)
+ util.Uint64toBytes(buffer, i.fileKey)
+ files[i.vid].Write(buffer)
+ }
+ }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ for _, chunk := range entry.Entry.Chunks {
+ outputChan <- &Item{
+ vid: chunk.Fid.VolumeId,
+ fileKey: chunk.Fid.FileKey,
+ }
+ }
+ return nil
+ })
+}
+
+func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
+
+ db := needle_map.NewMemDb()
+ defer db.Close()
+
+ if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
+ return
+ }
+
+ filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
+ if err != nil {
+ return
+ }
+
+ dataLen := len(filerFileIdsData)
+ if dataLen%8 != 0 {
+ return 0, nil, 0, fmt.Errorf("filer data is corrupted")
+ }
+
+ for i := 0; i < len(filerFileIdsData); i += 8 {
+ fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
+ db.Delete(types.NeedleId(fileKey))
+ inUseCount++
+ }
+
+ var orphanFileCount uint64
+ db.AscendingVisit(func(n needle_map.NeedleValue) error {
+ // fmt.Printf("%d,%x\n", volumeId, n.Key)
+ orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
+ orphanFileCount++
+ orphanDataSize += uint64(n.Size)
+ return nil
+ })
+
+ if orphanFileCount > 0 {
+ pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
+ fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
+ volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
+ }
+
+ return
+
+}
+
+type VInfo struct {
+ server string
+ collection string
+ isEcVolume bool
+}
+
+func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
+
+ if verbose {
+ fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
+ }
+
+ volumeIdToServer = make(map[uint32]VInfo)
+ var resp *master_pb.VolumeListResponse
+ err = c.env.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
+ for _, vi := range t.VolumeInfos {
+ volumeIdToServer[vi.Id] = VInfo{
+ server: t.Id,
+ collection: vi.Collection,
+ isEcVolume: false,
+ }
+ }
+ for _, ecShardInfo := range t.EcShardInfos {
+ volumeIdToServer[ecShardInfo.Id] = VInfo{
+ server: t.Id,
+ collection: ecShardInfo.Collection,
+ isEcVolume: true,
+ }
+ }
+ })
+
+ if verbose {
+ fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
+ }
+ return
+}
+
+func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
+ fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
+ locations, found := c.env.MasterClient.GetLocations(volumeId)
+ if !found {
+ return fmt.Errorf("failed to find volume %d locations", volumeId)
+ }
+
+ resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
+ var wg sync.WaitGroup
+ for _, location := range locations {
+ wg.Add(1)
+ go func(server string, fidList []string) {
+ defer wg.Done()
+
+ if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
+ err = deleteErr
+ } else if deleteResults != nil {
+ resultChan <- deleteResults
+ }
+
+ }(location.Url, fileIds)
+ }
+ wg.Wait()
+ close(resultChan)
+
+ for results := range resultChan {
+ for _, result := range results {
+ if result.Error != "" {
+ fmt.Fprintf(writer, "purge error: %s\n", result.Error)
+ }
+ }
+ }
+
+ return
+}
+
+func getVolumeFileIdFile(tempFolder string, vid uint32) string {
+ return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
+}
+
+func getFilerFileIdFile(tempFolder string, vid uint32) string {
+ return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
+}
+
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
+ flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
+ dst, err := os.OpenFile(fileName, flags, 0644)
+ if err != nil {
+ return nil
+ }
+ defer dst.Close()
+
+ for {
+ resp, receiveErr := client.Recv()
+ if receiveErr == io.EOF {
+ break
+ }
+ if receiveErr != nil {
+ return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ }
+ dst.Write(resp.FileContent)
+ }
+ return nil
+}
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index c6c79d150..c5a9388fa 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -32,9 +32,8 @@ func (c *commandVolumeList) Help() string {
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 21bc342b4..ded7b7e66 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -34,6 +34,10 @@ func (c *commandVolumeMount) Help() string {
func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
if len(args) != 2 {
fmt.Fprintf(writer, "received args: %+v\n", args)
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
@@ -45,14 +49,13 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return mountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
-func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
return mountErr
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 2e39c0600..392b947e7 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -44,6 +44,10 @@ func (c *commandVolumeMove) Help() string {
func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
if len(args) != 3 {
fmt.Fprintf(writer, "received args: %+v\n", args)
return fmt.Errorf("need 3 args of <source volume server host:port> <target volume server host:port> <volume id>")
@@ -59,26 +63,25 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- ctx := context.Background()
- return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
-func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
+func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- lastAppendAtNs, err := copyVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- if err = tailVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
+ if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
- if err = deleteVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer); err != nil {
+ if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil {
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
}
@@ -86,10 +89,10 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI
return nil
}
-func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
+func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
- err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: sourceVolumeServer,
})
@@ -102,10 +105,10 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
return
}
-func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
+func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
- return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
+ return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
@@ -116,9 +119,9 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
}
-func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
+func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})
return deleteErr
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index 0f1a1bb6e..d31c8c031 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -42,6 +42,10 @@ func (c *commandVolumeTierDownload) Help() string {
func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
collection := tierCommand.String("collection", "", "the collection name")
@@ -49,18 +53,17 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// collect topology information
- topologyInfo, err := collectTopologyInfo(ctx, commandEnv)
+ topologyInfo, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
// volumeId is provided
if vid != 0 {
- return doVolumeTierDownload(ctx, commandEnv, writer, *collection, vid)
+ return doVolumeTierDownload(commandEnv, writer, *collection, vid)
}
// apply to all volumes in the collection
@@ -71,7 +74,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr
}
fmt.Printf("tier download volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doVolumeTierDownload(ctx, commandEnv, writer, *collection, vid); err != nil {
+ if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil {
return err
}
}
@@ -97,7 +100,7 @@ func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection s
return
}
-func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) {
+func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@@ -107,7 +110,7 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io
// TODO parallelize this
for _, loc := range locations {
// copy the .dat file from remote tier to local
- err = downloadDatFromRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
+ err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
if err != nil {
return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err)
}
@@ -116,10 +119,10 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io
return nil
}
-func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
+func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
- err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
+ err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -145,14 +148,14 @@ func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOpti
return downloadErr
}
- _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
if unmountErr != nil {
return unmountErr
}
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
if mountErr != nil {
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index 20da1187c..f92cdc3e4 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -56,6 +56,10 @@ func (c *commandVolumeTierUpload) Help() string {
func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
volumeId := tierCommand.Int("volumeId", 0, "the volume id")
collection := tierCommand.String("collection", "", "the collection name")
@@ -67,23 +71,22 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// volumeId is provided
if vid != 0 {
- return doVolumeTierUpload(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
+ return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
}
// apply to all volumes in the collection
// reusing collectVolumeIdsForEcEncode for now
- volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
fmt.Printf("tier upload volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doVolumeTierUpload(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
+ if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
return err
}
}
@@ -91,20 +94,20 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
-func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
+func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
return fmt.Errorf("volume %d not found", vid)
}
- err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// copy the .dat file to remote tier
- err = uploadDatToRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
+ err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
if err != nil {
return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err)
}
@@ -112,10 +115,10 @@ func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.W
return nil
}
-func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
+func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
DestinationBackendName: dest,
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index 826258dfb..7596bb4c8 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -34,6 +34,10 @@ func (c *commandVolumeUnmount) Help() string {
func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
if len(args) != 2 {
fmt.Fprintf(writer, "received args: %+v\n", args)
return fmt.Errorf("need 2 args of <volume server host:port> <volume id>")
@@ -45,14 +49,13 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return unmountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
-func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
+func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
return unmountErr
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index f1fcb62d4..f61ed9f82 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -1,19 +1,19 @@
package shell
import (
- "context"
"fmt"
"io"
"net/url"
- "path/filepath"
"strconv"
"strings"
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/exclusive_locks"
)
type ShellOptions struct {
@@ -29,6 +29,7 @@ type CommandEnv struct {
env map[string]string
MasterClient *wdclient.MasterClient
option ShellOptions
+ locker *exclusive_locks.ExclusiveLocker
}
type command interface {
@@ -42,55 +43,67 @@ var (
)
func NewCommandEnv(options ShellOptions) *CommandEnv {
- return &CommandEnv{
- env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(context.Background(),
- options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")),
- option: options,
+ ce := &CommandEnv{
+ env: make(map[string]string),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", 0, strings.Split(*options.Masters, ",")),
+ option: options,
}
+ ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient)
+ return ce
}
-func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) {
+func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
if strings.HasPrefix(input, "http") {
- return parseFilerUrl(input)
+ err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
+ return
}
if !strings.HasPrefix(input, "/") {
- input = filepath.ToSlash(filepath.Join(ce.option.Directory, input))
+ input = util.Join(ce.option.Directory, input)
}
- return ce.option.FilerHost, ce.option.FilerPort, input, err
+ return input, err
}
-func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
+func (ce *CommandEnv) isDirectory(path string) bool {
- return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil
+ return ce.checkDirectory(path) == nil
}
-func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
+func (ce *CommandEnv) confirmIsLocked() error {
- dir, name := filer2.FullPath(path).DirAndName()
+ if ce.locker.IsLocking() {
+ return nil
+ }
- return ce.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return fmt.Errorf("need to lock to continue")
- resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
- Directory: dir,
- Name: name,
- })
- if lookupErr != nil {
- return lookupErr
- }
+}
- if resp.Entry == nil {
- return fmt.Errorf("entry not found")
- }
+func (ce *CommandEnv) checkDirectory(path string) error {
- if !resp.Entry.IsDirectory {
- return fmt.Errorf("not a directory")
- }
+ dir, name := util.FullPath(path).DirAndName()
- return nil
- })
+ exists, err := filer_pb.Exists(ce, dir, name, true)
+
+ if !exists {
+ return fmt.Errorf("%s is not a directory", path)
+ }
+
+ return err
+
+}
+
+var _ = filer_pb.FilerClient(&CommandEnv{})
+
+func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress := fmt.Sprintf("%s:%d", ce.option.FilerHost, ce.option.FilerPort+10000)
+ return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn)
+
+}
+func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
@@ -107,7 +120,7 @@ func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path
}
path = u.Path
} else {
- err = fmt.Errorf("path should have full url http://<filer_server>:<port>/path/to/dirOrFile : %s", entryPath)
+ err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
}
return
}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index a4f17e0fa..4632a1fb0 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -6,9 +6,8 @@ import (
"os"
"path"
"regexp"
- "strings"
-
"sort"
+ "strings"
"github.com/peterh/liner"
)
@@ -46,40 +45,49 @@ func RunShell(options ShellOptions) {
return
}
- cmds := reg.FindAllString(cmd, -1)
- if len(cmds) == 0 {
- continue
- } else {
- line.AppendHistory(cmd)
+ for _, c := range strings.Split(cmd, ";") {
+ if processEachCmd(reg, c, commandEnv) {
+ return
+ }
+ }
+ }
+}
- args := make([]string, len(cmds[1:]))
+func processEachCmd(reg *regexp.Regexp, cmd string, commandEnv *CommandEnv) bool {
+ cmds := reg.FindAllString(cmd, -1)
+ if len(cmds) == 0 {
+ return false
+ } else {
+ line.AppendHistory(cmd)
- for i := range args {
- args[i] = strings.Trim(string(cmds[1+i]), "\"'")
- }
+ args := make([]string, len(cmds[1:]))
- cmd := strings.ToLower(cmds[0])
- if cmd == "help" || cmd == "?" {
- printHelp(cmds)
- } else if cmd == "exit" || cmd == "quit" {
- return
- } else {
- foundCommand := false
- for _, c := range Commands {
- if c.Name() == cmd {
- if err := c.Do(args, commandEnv, os.Stdout); err != nil {
- fmt.Fprintf(os.Stderr, "error: %v\n", err)
- }
- foundCommand = true
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+
+ cmd := strings.ToLower(cmds[0])
+ if cmd == "help" || cmd == "?" {
+ printHelp(cmds)
+ } else if cmd == "exit" || cmd == "quit" {
+ return true
+ } else {
+ foundCommand := false
+ for _, c := range Commands {
+ if c.Name() == cmd || c.Name() == "fs."+cmd {
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
- }
- if !foundCommand {
- fmt.Fprintf(os.Stderr, "unknown command: %v\n", cmd)
+ foundCommand = true
}
}
-
+ if !foundCommand {
+ fmt.Fprintf(os.Stderr, "unknown command: %v\n", cmd)
+ }
}
+
}
+ return false
}
func printGenericHelp() {