From 3a1d017de238af668cf346ac69de34fe97cb9e7d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 21 Feb 2020 21:23:25 -0800 Subject: shell: ensure dc and rack does not change for replicated volumes fix https://github.com/chrislusf/seaweedfs/issues/1203 --- weed/shell/command_volume_balance.go | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index bed4f4306..488beb998 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -109,14 +109,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error { - - var nodes []*Node - for _, dn := range dataNodeInfos { - nodes = append(nodes, &Node{ - info: dn, - }) - } +func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { @@ -151,15 +144,19 @@ func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.Dat return nil } -func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*master_pb.DataNodeInfo) { - typeToNodes = make(map[uint64][]*master_pb.DataNodeInfo) +func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) { + typeToNodes = make(map[uint64][]*Node) for _, dc := range t.DataCenterInfos { if selectedDataCenter != "" && dc.Id != selectedDataCenter { continue } for _, r := range dc.RackInfos { for _, dn := range r.DataNodeInfos { - typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], dn) + typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{ + info: dn, + dc: dc.Id, + rack: r.Id, + }) } } } @@ -169,6 +166,8 @@ func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter st type Node struct { info *master_pb.DataNodeInfo selectedVolumes map[uint32]*master_pb.VolumeInformationMessage + dc string + rack string } func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { @@ -210,6 +209,13 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates sortCandidatesFn(candidateVolumes) for _, v := range candidateVolumes { + if v.ReplicaPlacement > 0 { + if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack { + // TODO this logic is too simple, but should work most of the time + // Need a correct algorithm to handle all different cases + continue + } + } if _, found := emptyNode.selectedVolumes[v.Id]; !found { if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil { delete(fullNode.selectedVolumes, v.Id) -- cgit v1.2.3 From 0644d637484fb811ab2b0491becde49fa51894aa Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 24 Feb 2020 23:30:01 -0800 Subject: shell: add commands for bucket --- weed/shell/command_bucket_create.go | 90 +++++++++++++++++++++++++++++++++++++ weed/shell/command_bucket_delete.go | 73 ++++++++++++++++++++++++++++++ weed/shell/command_bucket_list.go | 83 ++++++++++++++++++++++++++++++++++ 3 files changed, 246 insertions(+) create mode 100644 weed/shell/command_bucket_create.go create mode 100644 weed/shell/command_bucket_delete.go create mode 100644 weed/shell/command_bucket_list.go (limited to 'weed/shell') diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go new file mode 100644 index 000000000..603e9c564 --- /dev/null +++ b/weed/shell/command_bucket_create.go @@ -0,0 +1,90 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandBucketCreate{}) +} + +type commandBucketCreate struct { +} + +func (c *commandBucketCreate) Name() string { + return "bucket.create" +} + +func (c *commandBucketCreate) Help() string { + return `create a bucket with a given name + + Example: + bucket.create -name -replication 001 +` +} + +func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + bucketName := bucketCommand.String("name", "", "bucket name") + replication := bucketCommand.String("replication", "", "replication setting for the bucket") + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + if *bucketName == "" { + return fmt.Errorf("empty bucket name") + } + + filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + if parseErr != nil { + return parseErr + } + + ctx := context.Background() + + err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + + resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) + } + filerBucketsPath := resp.DirBuckets + + println("create bucket under", filerBucketsPath) + + entry := &filer_pb.Entry{ + Name: *bucketName, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(0777 | os.ModeDir), + Collection: *bucketName, + Replication: *replication, + }, + } + + if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ + Directory: filerBucketsPath, + Entry: entry, + }); err != nil { + return err + } + + println("created bucket", *bucketName) + + return nil + + }) + + return err + +} diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go new file mode 100644 index 000000000..9e814ccf9 --- /dev/null +++ b/weed/shell/command_bucket_delete.go @@ -0,0 +1,73 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandBucketDelete{}) +} + +type commandBucketDelete struct { +} + +func (c *commandBucketDelete) Name() string { + return "bucket.delete" +} + +func (c *commandBucketDelete) Help() string { + return `delete a bucket by a given name + + bucket.delete -name +` +} + +func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + bucketName := bucketCommand.String("name", "", "bucket name") + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + if *bucketName == "" { + return fmt.Errorf("empty bucket name") + } + + filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + if parseErr != nil { + return parseErr + } + + ctx := context.Background() + + err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + + resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) + } + filerBucketsPath := resp.DirBuckets + + if _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ + Directory: filerBucketsPath, + Name: *bucketName, + IsDeleteData: false, + IsRecursive: true, + IgnoreRecursiveError: true, + }); err != nil { + return err + } + + return nil + + }) + + return err + +} diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go new file mode 100644 index 000000000..051eeda2d --- /dev/null +++ b/weed/shell/command_bucket_list.go @@ -0,0 +1,83 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "math" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandBucketList{}) +} + +type commandBucketList struct { +} + +func (c *commandBucketList) Name() string { + return "bucket.list" +} + +func (c *commandBucketList) Help() string { + return `list all buckets + +` +} + +func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + if parseErr != nil { + return parseErr + } + + ctx := context.Background() + + err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + + resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) + } + filerBucketsPath := resp.DirBuckets + + stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + Directory: filerBucketsPath, + Limit: math.MaxUint32, + }) + if err != nil { + return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + if resp.Entry.Attributes.Replication == "" { + fmt.Fprintf(writer, " %s\n", resp.Entry.Name) + } else { + fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", resp.Entry.Name, resp.Entry.Attributes.Replication) + } + } + + return nil + + }) + + return err + +} -- cgit v1.2.3 From e86da5a4918e0cf663a8c592387c72c850647e4e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 00:42:48 -0800 Subject: minor --- weed/shell/command_bucket_list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go index 051eeda2d..32198c29d 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_bucket_list.go @@ -67,7 +67,7 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io. } } - if resp.Entry.Attributes.Replication == "" { + if resp.Entry.Attributes.Replication == "" || resp.Entry.Attributes.Replication == "000" { fmt.Fprintf(writer, " %s\n", resp.Entry.Name) } else { fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", resp.Entry.Name, resp.Entry.Attributes.Replication) -- cgit v1.2.3 From 7d10fdf73720fb3234cd5cacfaf10fb79590d754 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 11:13:06 -0800 Subject: fix directory lookup nil --- weed/shell/command_fs_cat.go | 3 +++ weed/shell/command_fs_meta_cat.go | 3 +++ weed/shell/command_fs_mv.go | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 238dee7f9..06c8232c9 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -56,6 +56,9 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write if err != nil { return err } + if respLookupEntry.Entry == nil { + return fmt.Errorf("file not found: %s", path) + } return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32) diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 9980f67a2..ec9a495f2 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -55,6 +55,9 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W if err != nil { return err } + if respLookupEntry.Entry == nil { + return fmt.Errorf("file not found: %s", path) + } m := jsonpb.Marshaler{ EmitDefaults: true, diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index e77755921..b9301ad3c 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -65,7 +65,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer var targetDir, targetName string // moving a file or folder - if err == nil && respDestinationLookupEntry.Entry.IsDirectory { + if err == nil && respDestinationLookupEntry.Entry!= nil && respDestinationLookupEntry.Entry.IsDirectory { // to a directory targetDir = filepath.ToSlash(filepath.Join(destinationDir, destinationName)) targetName = sourceName -- cgit v1.2.3 From 892e726eb9c2427634c46f8ae9b7bcf0b6d1b082 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 21:50:12 -0800 Subject: avoid reusing context object fix https://github.com/chrislusf/seaweedfs/issues/1182 --- weed/shell/command_bucket_create.go | 8 ++-- weed/shell/command_bucket_delete.go | 8 ++-- weed/shell/command_bucket_list.go | 8 ++-- weed/shell/command_collection_delete.go | 5 +-- weed/shell/command_collection_list.go | 5 +-- weed/shell/command_ec_balance.go | 46 ++++++++++---------- weed/shell/command_ec_common.go | 43 +++++++++---------- weed/shell/command_ec_decode.go | 43 ++++++++++--------- weed/shell/command_ec_encode.go | 49 ++++++++++------------ weed/shell/command_ec_rebuild.go | 29 ++++++------- weed/shell/command_ec_test.go | 2 +- weed/shell/command_fs_cat.go | 8 ++-- weed/shell/command_fs_cd.go | 5 +-- weed/shell/command_fs_du.go | 23 +++++----- weed/shell/command_fs_ls.go | 7 +--- weed/shell/command_fs_meta_cat.go | 6 +-- weed/shell/command_fs_meta_load.go | 10 ++--- weed/shell/command_fs_meta_save.go | 2 +- weed/shell/command_fs_mv.go | 8 ++-- weed/shell/command_fs_tree.go | 2 +- weed/shell/command_volume_balance.go | 8 ++-- weed/shell/command_volume_configure_replication.go | 9 ++-- weed/shell/command_volume_copy.go | 4 +- weed/shell/command_volume_delete.go | 4 +- weed/shell/command_volume_fix_replication.go | 9 ++-- weed/shell/command_volume_list.go | 5 +-- weed/shell/command_volume_mount.go | 9 ++-- weed/shell/command_volume_move.go | 29 +++++++------ weed/shell/command_volume_tier_download.go | 21 +++++----- weed/shell/command_volume_tier_upload.go | 19 ++++----- weed/shell/command_volume_unmount.go | 9 ++-- weed/shell/commands.go | 12 +++--- 32 files changed, 202 insertions(+), 253 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go index 603e9c564..3546528aa 100644 --- a/weed/shell/command_bucket_create.go +++ b/weed/shell/command_bucket_create.go @@ -48,11 +48,9 @@ func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer i return parseErr } - ctx := context.Background() + err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) } @@ -72,7 +70,7 @@ func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer i }, } - if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: filerBucketsPath, Entry: entry, }); err != nil { diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go index 9e814ccf9..c57ce7221 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_bucket_delete.go @@ -44,17 +44,15 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i return parseErr } - ctx := context.Background() + err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) } filerBucketsPath := resp.DirBuckets - if _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{ + if _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ Directory: filerBucketsPath, Name: *bucketName, IsDeleteData: false, diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go index 32198c29d..5eb5972ce 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_bucket_list.go @@ -39,17 +39,15 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io. return parseErr } - ctx := context.Background() + err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) } filerBucketsPath := resp.DirBuckets - stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ + stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ Directory: filerBucketsPath, Limit: math.MaxUint32, }) diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index fbaddcd51..4b3d7f0be 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -34,9 +34,8 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ collectionName := args[0] - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: collectionName, }) return err diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index c4325c66f..2a114e61b 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -41,9 +41,8 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) { var resp *master_pb.CollectionListResponse - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.CollectionList(ctx, &master_pb.CollectionListRequest{ + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: includeNormalVolumes, IncludeEcVolumes: includeEcVolumes, }) diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 96599372e..7230a869f 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -107,10 +107,8 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W return nil } - ctx := context.Background() - // collect all ec nodes - allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, *dc) + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc) if err != nil { return err } @@ -138,7 +136,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W } } - if err := balanceEcRacks(ctx, commandEnv, racks, *applyBalancing); err != nil { + if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil { return fmt.Errorf("balance ec racks: %v", err) } @@ -170,11 +168,11 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*E return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsAcrossRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { + if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) } - if err := balanceEcShardsWithinRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { + if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil { return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err) } @@ -186,14 +184,14 @@ func deleteDuplicatedEcShards(ctx context.Context, commandEnv *CommandEnv, allEc vidLocations := collectVolumeIdToEcNodes(allEcNodes) // deduplicate ec shards for vid, locations := range vidLocations { - if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil { + if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil { return err } } return nil } -func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { +func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error { // check whether this volume has ecNodes that are over average shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount) @@ -215,10 +213,10 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti duplicatedShardIds := []uint32{uint32(shardId)} for _, ecNode := range ecNodes[1:] { - if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil { return err } - if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil { + if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil { return err } ecNode.deleteEcVolumeShards(vid, duplicatedShardIds) @@ -227,19 +225,19 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti return nil } -func balanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { +func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := collectVolumeIdToEcNodes(allEcNodes) // spread the ec shards evenly for vid, locations := range vidLocations { - if err := doBalanceEcShardsAcrossRacks(ctx, commandEnv, collection, vid, locations, racks, applyBalancing); err != nil { + if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil { return err } } return nil } -func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { +func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { // calculate average number of shards an ec rack should have for one volume averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks)) @@ -274,7 +272,7 @@ func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, c for _, n := range racks[rackId].ecNodes { possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) } - err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) + err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) if err != nil { return err } @@ -306,7 +304,7 @@ func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]i return "" } -func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { +func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error { // collect vid => []ecNode, since previous steps can change the locations vidLocations := collectVolumeIdToEcNodes(allEcNodes) @@ -330,7 +328,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) - if err := doBalanceEcShardsWithinOneRack(ctx, commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil { + if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil { return err } } @@ -338,7 +336,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all return nil } -func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { for _, ecNode := range existingLocations { @@ -353,7 +351,7 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv, fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId) - err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) + err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing) if err != nil { return err } @@ -365,18 +363,18 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv, return nil } -func balanceEcRacks(ctx context.Context, commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error { +func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error { // balance one rack for all ec shards for _, ecRack := range racks { - if err := doBalanceEcRack(ctx, commandEnv, ecRack, applyBalancing); err != nil { + if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil { return err } } return nil } -func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error { +func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error { if len(ecRack.ecNodes) <= 1 { return nil @@ -421,7 +419,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) - err := moveMountedShardToEcNode(ctx, commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) + err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) if err != nil { return err } @@ -440,7 +438,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack return nil } -func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { +func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes) @@ -458,7 +456,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, a fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id) - err := moveMountedShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) + err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing) if err != nil { return err } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index e187d5a3b..0db119d3c 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -15,26 +15,26 @@ import ( "google.golang.org/grpc" ) -func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { +func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { copiedShardIds := []uint32{uint32(shardId)} if applyBalancing { // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id) + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) if err != nil { return err } // ask source node to delete the shard, and maybe the ecx file - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) if err != nil { return err } @@ -50,18 +50,18 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist } -func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, +func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer *EcNode, shardIdsToCopy []uint32, volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { if targetServer.info.Id != existingLocation { fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, @@ -76,7 +76,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption } fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id) - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, @@ -178,12 +178,12 @@ type EcRack struct { freeEcSlot int } -func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -211,13 +211,12 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCen return } -func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { +func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: toBeDeletedShardIds, @@ -227,13 +226,12 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt } -func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { +func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error { fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, }) @@ -241,13 +239,12 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, }) } -func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { +func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error { fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: toBeMountedhardIds, diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 8a705a5ae..b69e403cb 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -43,25 +43,24 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfo(ctx, commandEnv) + topologyInfo, err := collectTopologyInfo(commandEnv) if err != nil { return err } // volumeId is provided if vid != 0 { - return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid) + return doEcDecode(commandEnv, topologyInfo, *collection, vid) } // apply to all volumes in the collection volumeIds := collectEcShardIds(topologyInfo, *collection) fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { return err } } @@ -69,26 +68,26 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { // find volume location nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) // collect ec shards to the server with most space - targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid) + targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid) if err != nil { return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) } // generate a normal volume - err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) + err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) if err != nil { return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } // delete the previous ec shards - err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) + err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } @@ -96,11 +95,11 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb return nil } -func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) return mountErr @@ -111,7 +110,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO // unmount ec shards for location, ecIndexBits := range nodeToEcIndexBits { fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) + err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) if err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) } @@ -119,7 +118,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO // delete ec shards for location, ecIndexBits := range nodeToEcIndexBits { fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) + err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) if err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) } @@ -128,12 +127,12 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO return nil } -func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, }) @@ -144,7 +143,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v } -func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits @@ -170,11 +169,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), Collection: collection, ShardIds: needToCopyEcIndexBits.ToUint32Slice(), @@ -204,11 +203,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB } -func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { +func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 587b59388..e22691c00 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -63,22 +63,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // volumeId is provided if vid != 0 { - return doEcEncode(ctx, commandEnv, *collection, vid) + return doEcEncode(commandEnv, *collection, vid) } // apply to all volumes in the collection - volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { + if err = doEcEncode(commandEnv, *collection, vid); err != nil { return err } } @@ -86,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -96,19 +95,19 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, // fmt.Printf("found ec %d shards on %v\n", vid, locations) // mark the volume as readonly - err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) + err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } // balance the ec shards to current cluster - err = spreadEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(context.Background(), commandEnv, vid, collection, locations) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -116,12 +115,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return nil } -func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { +func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { for _, location := range locations { - err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) return markErr @@ -136,10 +135,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol return nil } -func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -152,7 +151,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { - allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "") + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { return err } @@ -169,26 +168,26 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle allocatedEcIds := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume for _, location := range existingLocations { - err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url) + err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) } @@ -198,9 +197,7 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } -func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*EcNode, allocatedEcIds [][]uint32, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { +func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize shardIdChan := make(chan []uint32, len(targetServers)) @@ -213,7 +210,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia wg.Add(1) go func(server *EcNode, allocatedEcShardIds []uint32) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, allocatedEcShardIds, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr @@ -255,11 +252,11 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 600a8cb45..d9d943e6d 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -64,7 +64,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W } // collect all ec nodes - allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv, "") + allEcNodes, _, err := collectEcNodes(commandEnv, "") if err != nil { return err } @@ -92,8 +92,6 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { - ctx := context.Background() - fmt.Printf("rebuildEcVolumes %s\n", collection) // collect vid => each shard locations, similar to ecShardMap in topology.go @@ -117,7 +115,7 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s return fmt.Errorf("disk space is not enough") } - if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { + if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { return err } } @@ -125,13 +123,13 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s return nil } -func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { +func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk var generatedShardIds []uint32 - copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) + copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) if err != nil { return err } @@ -139,7 +137,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * // clean up working files // ask the rebuilder to delete the copied shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) if err != nil { fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) } @@ -151,13 +149,13 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * } // generate ec shards, and maybe ecx file - generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) + generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) if err != nil { return err } // mount the generated shards - err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) if err != nil { return err } @@ -167,11 +165,10 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * return nil } -func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, - collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { +func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -183,7 +180,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, return } -func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { +func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { needEcxFile := true var localShardBits erasure_coding.ShardBits @@ -209,8 +206,8 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: []uint32{uint32(shardId)}, diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index c233d25d0..ddd52303c 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -121,7 +121,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { racks := collectRacks(allEcNodes) balanceEcVolumes(nil, "c1", allEcNodes, racks, false) - balanceEcRacks(context.Background(), nil, racks, false) + balanceEcRacks(nil, racks, false) } func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode { diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 06c8232c9..8364e0de1 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -38,21 +38,19 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write return err } - ctx := context.Background() - - if commandEnv.isDirectory(ctx, filerServer, filerPort, path) { + if commandEnv.isDirectory(filerServer, filerPort, path) { return fmt.Errorf("%s is a directory", path) } dir, name := filer2.FullPath(path).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, Directory: dir, } - respLookupEntry, err := client.LookupDirectoryEntry(ctx, request) + respLookupEntry, err := client.LookupDirectoryEntry(context.Background(), request) if err != nil { return err } diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go index 408ec86c8..df42cd516 100644 --- a/weed/shell/command_fs_cd.go +++ b/weed/shell/command_fs_cd.go @@ -1,7 +1,6 @@ package shell import ( - "context" "io" ) @@ -45,9 +44,7 @@ func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer return nil } - ctx := context.Background() - - err = commandEnv.checkDirectory(ctx, filerServer, filerPort, path) + err = commandEnv.checkDirectory(filerServer, filerPort, path) if err == nil { commandEnv.option.FilerHost = filerServer diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index d6ea51d0c..a1e21bfa6 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" @@ -39,15 +38,13 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer return err } - ctx := context.Background() - - if commandEnv.isDirectory(ctx, filerServer, filerPort, path) { + if commandEnv.isDirectory(filerServer, filerPort, path) { path = path + "/" } var blockCount, byteCount uint64 dir, name := filer2.FullPath(path).DirAndName() - blockCount, byteCount, err = duTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name) + blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name) if name == "" && err == nil { fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir) @@ -57,15 +54,15 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer } -func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount uint64, byteCount uint64, err error) { +func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) { - err = filer2.ReadDirAllEntries(ctx, filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer2.ReadDirAllEntries(filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { if entry.IsDirectory { subDir := fmt.Sprintf("%s/%s", dir, entry.Name) if dir == "/" { subDir = "/" + entry.Name } - numBlock, numByte, err := duTraverseDirectory(ctx, writer, filerClient, subDir, "") + numBlock, numByte, err := duTraverseDirectory(writer, filerClient, subDir, "") if err == nil { blockCount += numBlock byteCount += numByte @@ -82,12 +79,12 @@ func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient file return } -func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { +func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) - return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(ctx2, client) + return fn(client) }, filerGrpcAddress, env.option.GrpcDialOption) } @@ -105,6 +102,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm filerPort: filerPort, } } -func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { - return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn) +func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + return c.env.withFilerClient(c.filerServer, c.filerPort, fn) } diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 0c63f71fa..69ebe1b30 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" "os" @@ -60,16 +59,14 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer return err } - ctx := context.Background() - - if commandEnv.isDirectory(ctx, filerServer, filerPort, path) { + if commandEnv.isDirectory(filerServer, filerPort, path) { path = path + "/" } dir, name := filer2.FullPath(path).DirAndName() entryCount := 0 - err = filer2.ReadDirAllEntries(ctx, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer2.ReadDirAllEntries(commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { if !showHidden && strings.HasPrefix(entry.Name, ".") { return diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index ec9a495f2..ec5a093df 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -41,17 +41,15 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - ctx := context.Background() - dir, name := filer2.FullPath(path).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, Directory: dir, } - respLookupEntry, err := client.LookupDirectoryEntry(ctx, request) + respLookupEntry, err := client.LookupDirectoryEntry(context.Background(), request) if err != nil { return err } diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 8f2ef95e3..ed92d8011 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -1,15 +1,15 @@ package shell import ( - "context" "fmt" "io" "os" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) func init() { @@ -53,9 +53,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - ctx := context.Background() - - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) @@ -80,7 +78,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. return err } - if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: fullEntry.Dir, Entry: fullEntry.Entry, }); err != nil { diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 178c826d5..7112c7526 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -168,7 +168,7 @@ func processOneDirectory(ctx context.Context, writer io.Writer, filerClient file parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - return filer2.ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { + return filer2.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { fn(parentPath, entry) diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index b9301ad3c..78f797f6c 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -47,20 +47,18 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer return err } - ctx := context.Background() - sourceDir, sourceName := filer2.FullPath(sourcePath).DirAndName() destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ Name: destinationDir, Directory: destinationName, } - respDestinationLookupEntry, err := client.LookupDirectoryEntry(ctx, destinationRequest) + respDestinationLookupEntry, err := client.LookupDirectoryEntry(context.Background(), destinationRequest) var targetDir, targetName string @@ -82,7 +80,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer NewName: targetName, } - _, err = client.AtomicRenameEntry(ctx, request) + _, err = client.AtomicRenameEntry(context.Background(), request) fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, filer2.NewFullPath(targetDir, targetName)) diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index 8660030e3..fb2583240 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -53,7 +53,7 @@ func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient fi prefix.addMarker(level) - err = filer2.ReadDirAllEntries(ctx, filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { + err = filer2.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { if level < 0 && name != "" { if entry.Name != name { return diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 488beb998..349f52f1c 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -69,9 +69,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } var resp *master_pb.VolumeListResponse - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -239,8 +238,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f } fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyBalancing { - ctx := context.Background() - return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) } return nil } diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 6000d0de0..133ec62c6 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -53,9 +53,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman replicaPlacementInt32 := uint32(replicaPlacement.Byte()) var resp *master_pb.VolumeListResponse - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -81,8 +80,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman } for _, dst := range allLocations { - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, configureErr := volumeServerClient.VolumeConfigure(ctx, &volume_server_pb.VolumeConfigureRequest{ + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{ VolumeId: uint32(vid), Replication: replicaPlacement.String(), }) diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index 1c83ba655..aecc071ad 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" @@ -47,7 +46,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("source and target volume servers are the same!") } - ctx := context.Background() - _, err = copyVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) + _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) return } diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go index 17d27ea3a..5869b1621 100644 --- a/weed/shell/command_volume_delete.go +++ b/weed/shell/command_volume_delete.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" @@ -42,7 +41,6 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) } - ctx := context.Background() - return deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) + return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) } diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 7a1a77cbe..210f4819d 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -50,9 +50,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } var resp *master_pb.VolumeListResponse - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -113,8 +112,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, break } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: volumeInfo.Id, SourceDataNode: sourceNode.dataNode.Id, }) diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index c6c79d150..c5a9388fa 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -32,9 +32,8 @@ func (c *commandVolumeList) Help() string { func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { var resp *master_pb.VolumeListResponse - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 21bc342b4..cffc7136b 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -45,14 +45,13 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) } - ctx := context.Background() - return mountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) + return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) } -func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ +func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) return mountErr diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 2e39c0600..c25b953a5 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -59,26 +59,25 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("source and target volume servers are the same!") } - ctx := context.Background() - return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second) } // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. -func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) { +func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) { log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) - lastAppendAtNs, err := copyVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) + lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) if err != nil { return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) } log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) - if err = tailVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil { + if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil { return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) } log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer) - if err = deleteVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer); err != nil { + if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil { return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err) } @@ -86,10 +85,10 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI return nil } -func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { +func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { - err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ + err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: sourceVolumeServer, }) @@ -102,10 +101,10 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne return } -func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { +func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { - return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{ + return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, IdleTimeoutSeconds: uint32(idleTimeout.Seconds()), @@ -116,9 +115,9 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne } -func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ +func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), }) return deleteErr diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 0f1a1bb6e..756dc4686 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -49,18 +49,17 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfo(ctx, commandEnv) + topologyInfo, err := collectTopologyInfo(commandEnv) if err != nil { return err } // volumeId is provided if vid != 0 { - return doVolumeTierDownload(ctx, commandEnv, writer, *collection, vid) + return doVolumeTierDownload(commandEnv, writer, *collection, vid) } // apply to all volumes in the collection @@ -71,7 +70,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr } fmt.Printf("tier download volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doVolumeTierDownload(ctx, commandEnv, writer, *collection, vid); err != nil { + if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil { return err } } @@ -97,7 +96,7 @@ func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection s return } -func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) { +func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -107,7 +106,7 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io // TODO parallelize this for _, loc := range locations { // copy the .dat file from remote tier to local - err = downloadDatFromRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url) + err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url) if err != nil { return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err) } @@ -116,10 +115,10 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io return nil } -func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { +func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { - err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ + err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -145,14 +144,14 @@ func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOpti return downloadErr } - _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{ + _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) if unmountErr != nil { return unmountErr } - _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) if mountErr != nil { diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index 20da1187c..5131e8f85 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -67,23 +67,22 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // volumeId is provided if vid != 0 { - return doVolumeTierUpload(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) + return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) } // apply to all volumes in the collection // reusing collectVolumeIdsForEcEncode for now - volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("tier upload volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doVolumeTierUpload(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { + if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { return err } } @@ -91,20 +90,20 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ return nil } -func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) { +func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { return fmt.Errorf("volume %d not found", vid) } - err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } // copy the .dat file to remote tier - err = uploadDatToRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile) + err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile) if err != nil { return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err) } @@ -112,10 +111,10 @@ func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.W return nil } -func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { +func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, DestinationBackendName: dest, diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index 826258dfb..6e5bef485 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -45,14 +45,13 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer return fmt.Errorf("wrong volume id format %s: %v", volumeId, err) } - ctx := context.Background() - return unmountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) + return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer) } -func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{ +func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) return unmountErr diff --git a/weed/shell/commands.go b/weed/shell/commands.go index f1fcb62d4..31ca31bc3 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -44,7 +44,7 @@ var ( func NewCommandEnv(options ShellOptions) *CommandEnv { return &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(context.Background(), + MasterClient: wdclient.NewMasterClient( options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")), option: options, } @@ -60,19 +60,19 @@ func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int6 return ce.option.FilerHost, ce.option.FilerPort, input, err } -func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool { +func (ce *CommandEnv) isDirectory(filerServer string, filerPort int64, path string) bool { - return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil + return ce.checkDirectory(filerServer, filerPort, path) == nil } -func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error { +func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path string) error { dir, name := filer2.FullPath(path).DirAndName() - return ce.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return ce.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + resp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, }) -- cgit v1.2.3 From 97ab8a1976f3ba056af8d5b630dcb43006425b51 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 22:23:59 -0800 Subject: remove ctx if possible --- weed/shell/command_ec_balance.go | 7 ++----- weed/shell/command_ec_encode.go | 4 ++-- weed/shell/command_fs_meta_notify.go | 5 +---- weed/shell/command_fs_meta_save.go | 14 ++++---------- weed/shell/command_fs_tree.go | 9 +++------ 5 files changed, 12 insertions(+), 27 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 7230a869f..299d44fed 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "io" @@ -160,11 +159,9 @@ func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack { func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error { - ctx := context.Background() - fmt.Printf("balanceEcVolumes %s\n", collection) - if err := deleteDuplicatedEcShards(ctx, commandEnv, allEcNodes, collection, applyBalancing); err != nil { + if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil { return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err) } @@ -179,7 +176,7 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*E return nil } -func deleteDuplicatedEcShards(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error { +func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error { // vid => []ecNode vidLocations := collectVolumeIdToEcNodes(allEcNodes) // deduplicate ec shards diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index e22691c00..6efb05488 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -107,7 +107,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) } // balance the ec shards to current cluster - err = spreadEcShards(context.Background(), commandEnv, vid, collection, locations) + err = spreadEcShards(commandEnv, vid, collection, locations) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -149,7 +149,7 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, } -func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { +func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index e2b2d22cc..099e04506 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" @@ -43,11 +42,9 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i v := util.GetViper() notification.LoadConfiguration(v, "notification.") - ctx := context.Background() - var dirCount, fileCount uint64 - err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { + err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { if entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 7112c7526..b51fdd0f6 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "io" @@ -59,8 +58,6 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return parseErr } - ctx := context.Background() - t := time.Now() fileName := *outputFileName if fileName == "" { @@ -89,7 +86,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { + err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), @@ -128,8 +125,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return err } -func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, - parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { +func doTraverseBFS(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { K := 5 @@ -151,7 +147,7 @@ func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.Fil continue } dir := t.(filer2.FullPath) - processErr := processOneDirectory(ctx, writer, filerClient, dir, queue, &jobQueueWg, fn) + processErr := processOneDirectory(writer, filerClient, dir, queue, &jobQueueWg, fn) if processErr != nil { err = processErr } @@ -164,9 +160,7 @@ func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.Fil return } -func processOneDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, - parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, - fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { +func processOneDirectory(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { return filer2.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index fb2583240..04530571c 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" "strings" @@ -37,9 +36,7 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ dir, name := filer2.FullPath(path).DirAndName() - ctx := context.Background() - - dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, newPrefix(), -1) + dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, newPrefix(), -1) if terr == nil { fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) @@ -49,7 +46,7 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ } -func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir filer2.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { +func treeTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir filer2.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { prefix.addMarker(level) @@ -65,7 +62,7 @@ func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient fi if entry.IsDirectory { directoryCount++ subDir := dir.Child(entry.Name) - dirCount, fCount, terr := treeTraverseDirectory(ctx, writer, filerClient, subDir, "", prefix, level+1) + dirCount, fCount, terr := treeTraverseDirectory(writer, filerClient, subDir, "", prefix, level+1) directoryCount += dirCount fileCount += fCount err = terr -- cgit v1.2.3 From 86cce3eb58f959066d2d11b8dde82a694e8f0999 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 22:29:01 -0800 Subject: fix test --- weed/shell/command_ec_test.go | 1 - 1 file changed, 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index ddd52303c..4fddcbea5 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "testing" -- cgit v1.2.3 From 0156e2975aa4aabe142301deb72cc2657eb79ee9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Feb 2020 16:46:01 -0800 Subject: mount: add mode to run external to SeaweedFS container cluster --- weed/shell/command_fs_du.go | 3 +++ 1 file changed, 3 insertions(+) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index a1e21bfa6..6c31ebdff 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -105,3 +105,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { return c.env.withFilerClient(c.filerServer, c.filerPort, fn) } +func (c *commandFilerClient) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} -- cgit v1.2.3 From ed0acd17227ce4561ff6a2c77564de0bd8062c56 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Feb 2020 16:52:57 -0800 Subject: go fmt --- weed/shell/command_fs_mv.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 78f797f6c..9b74e85e9 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -63,7 +63,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer var targetDir, targetName string // moving a file or folder - if err == nil && respDestinationLookupEntry.Entry!= nil && respDestinationLookupEntry.Entry.IsDirectory { + if err == nil && respDestinationLookupEntry.Entry != nil && respDestinationLookupEntry.Entry.IsDirectory { // to a directory targetDir = filepath.ToSlash(filepath.Join(destinationDir, destinationName)) targetName = sourceName -- cgit v1.2.3 From 6a8484b4ae2615c2cc88e3a66d03aead3966ba7c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 1 Mar 2020 22:13:47 -0800 Subject: master able to list all master clients by type --- weed/shell/commands.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 31ca31bc3..a67a4e45e 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -44,8 +44,7 @@ var ( func NewCommandEnv(options ShellOptions) *CommandEnv { return &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient( - options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, "shell", 0, strings.Split(*options.Masters, ",")), option: options, } } -- cgit v1.2.3 From 410bce3925fb47d6ba21f838fafed4ff7c892a90 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 1 Mar 2020 22:39:08 -0800 Subject: go fmt --- weed/shell/commands.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/commands.go b/weed/shell/commands.go index a67a4e45e..93a4c94bb 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -43,9 +43,9 @@ var ( func NewCommandEnv(options ShellOptions) *CommandEnv { return &CommandEnv{ - env: make(map[string]string), + env: make(map[string]string), MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, "shell", 0, strings.Split(*options.Masters, ",")), - option: options, + option: options, } } -- cgit v1.2.3 From f90c43635d96cace1ab1ca965a56a082f880aa4b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 4 Mar 2020 00:39:47 -0800 Subject: refactoring --- weed/shell/command_fs_du.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 6c31ebdff..ca2f22b57 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -4,11 +4,9 @@ import ( "fmt" "io" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -82,10 +80,7 @@ func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir, func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, env.option.GrpcDialOption) + return pb.WithGrpcFilerClient(filerGrpcAddress, env.option.GrpcDialOption, fn) } -- cgit v1.2.3 From 8645283a7b8a50485390267be9f83b83707f6161 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2020 16:51:46 -0800 Subject: fuse mount: avoid lookup nil entry fix https://github.com/chrislusf/seaweedfs/issues/1221 --- weed/shell/command_fs_cat.go | 6 +----- weed/shell/command_fs_meta_cat.go | 5 +---- weed/shell/command_fs_mv.go | 4 ++-- weed/shell/commands.go | 6 +----- 4 files changed, 5 insertions(+), 16 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 8364e0de1..3db487979 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" "math" @@ -50,13 +49,10 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write Name: name, Directory: dir, } - respLookupEntry, err := client.LookupDirectoryEntry(context.Background(), request) + respLookupEntry, err := filer_pb.LookupEntry(client, request) if err != nil { return err } - if respLookupEntry.Entry == nil { - return fmt.Errorf("file not found: %s", path) - } return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32) diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index ec5a093df..52e2ee6c0 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -49,13 +49,10 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W Name: name, Directory: dir, } - respLookupEntry, err := client.LookupDirectoryEntry(context.Background(), request) + respLookupEntry, err := filer_pb.LookupEntry(client, request) if err != nil { return err } - if respLookupEntry.Entry == nil { - return fmt.Errorf("file not found: %s", path) - } m := jsonpb.Marshaler{ EmitDefaults: true, diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 9b74e85e9..85275058e 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -58,12 +58,12 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer Name: destinationDir, Directory: destinationName, } - respDestinationLookupEntry, err := client.LookupDirectoryEntry(context.Background(), destinationRequest) + respDestinationLookupEntry, err := filer_pb.LookupEntry(client, destinationRequest) var targetDir, targetName string // moving a file or folder - if err == nil && respDestinationLookupEntry.Entry != nil && respDestinationLookupEntry.Entry.IsDirectory { + if err == nil && respDestinationLookupEntry.Entry.IsDirectory { // to a directory targetDir = filepath.ToSlash(filepath.Join(destinationDir, destinationName)) targetName = sourceName diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 93a4c94bb..2239fa435 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -71,7 +71,7 @@ func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path s return ce.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - resp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + resp, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, Name: name, }) @@ -79,10 +79,6 @@ func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path s return lookupErr } - if resp.Entry == nil { - return fmt.Errorf("entry not found") - } - if !resp.Entry.IsDirectory { return fmt.Errorf("not a directory") } -- cgit v1.2.3 From afb20de14cd597c4651a409ac3129f854f6bd1c5 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 7 Mar 2020 17:01:39 -0800 Subject: breaks dependency loop --- weed/shell/command_fs_meta_cat.go | 1 - weed/shell/commands.go | 1 - 2 files changed, 2 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 52e2ee6c0..cd1ffb6fd 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 2239fa435..b8832ad93 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -1,7 +1,6 @@ package shell import ( - "context" "fmt" "io" "net/url" -- cgit v1.2.3 From ae2ee379c065f97f4661db926e1e14808940b607 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 22 Mar 2020 01:37:46 -0700 Subject: consistent 64bit size --- weed/shell/command_fs_cat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 3db487979..7d2ac8989 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -54,7 +54,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write return err } - return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt32) + return filer2.StreamContent(commandEnv.MasterClient, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64) }) -- cgit v1.2.3 From c0f0fdb3baeb6e9852c6876b23c1404b2c5e833d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 00:01:34 -0700 Subject: refactoring --- weed/shell/command_fs_cat.go | 3 ++- weed/shell/command_fs_du.go | 7 ++++--- weed/shell/command_fs_ls.go | 5 +++-- weed/shell/command_fs_meta_cat.go | 4 ++-- weed/shell/command_fs_meta_load.go | 3 +-- weed/shell/command_fs_meta_notify.go | 3 +-- weed/shell/command_fs_meta_save.go | 13 ++++++------- weed/shell/command_fs_mv.go | 8 ++++---- weed/shell/command_fs_tree.go | 10 +++++----- weed/shell/commands.go | 4 ++-- 10 files changed, 30 insertions(+), 30 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 7d2ac8989..1479aed95 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -41,7 +42,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write return fmt.Errorf("%s is a directory", path) } - dir, name := filer2.FullPath(path).DirAndName() + dir, name := util.FullPath(path).DirAndName() return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index ca2f22b57..b7313bebe 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -7,6 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -41,7 +42,7 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer } var blockCount, byteCount uint64 - dir, name := filer2.FullPath(path).DirAndName() + dir, name := util.FullPath(path).DirAndName() blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name) if name == "" && err == nil { @@ -52,9 +53,9 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer } -func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) { +func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) { - err = filer2.ReadDirAllEntries(filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { if entry.IsDirectory { subDir := fmt.Sprintf("%s/%s", dir, entry.Name) if dir == "/" { diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 69ebe1b30..4185d67a8 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -63,10 +64,10 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer path = path + "/" } - dir, name := filer2.FullPath(path).DirAndName() + dir, name := util.FullPath(path).DirAndName() entryCount := 0 - err = filer2.ReadDirAllEntries(commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { if !showHidden && strings.HasPrefix(entry.Name, ".") { return diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index cd1ffb6fd..9cbe852c0 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -6,8 +6,8 @@ import ( "github.com/golang/protobuf/jsonpb" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -40,7 +40,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W return err } - dir, name := filer2.FullPath(path).DirAndName() + dir, name := util.FullPath(path).DirAndName() return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index ed92d8011..a19e9d3ce 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -7,7 +7,6 @@ import ( "github.com/golang/protobuf/proto" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -85,7 +84,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. return err } - fmt.Fprintf(writer, "load %s\n", filer2.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name)) + fmt.Fprintf(writer, "load %s\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name)) if fullEntry.Entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index 099e04506..995ea16a2 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/notification" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -44,7 +43,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { + err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { if entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index b51fdd0f6..4314542bd 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -11,7 +11,6 @@ import ( "github.com/golang/protobuf/proto" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -86,7 +85,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { + err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), @@ -125,7 +124,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return err } -func doTraverseBFS(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { +func doTraverseBFS(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) { K := 5 @@ -146,7 +145,7 @@ func doTraverseBFS(writer io.Writer, filerClient filer2.FilerClient, parentPath time.Sleep(329 * time.Millisecond) continue } - dir := t.(filer2.FullPath) + dir := t.(util.FullPath) processErr := processOneDirectory(writer, filerClient, dir, queue, &jobQueueWg, fn) if processErr != nil { err = processErr @@ -160,9 +159,9 @@ func doTraverseBFS(writer io.Writer, filerClient filer2.FilerClient, parentPath return } -func processOneDirectory(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { +func processOneDirectory(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) { - return filer2.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { + return filer_pb.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { fn(parentPath, entry) @@ -172,7 +171,7 @@ func processOneDirectory(writer io.Writer, filerClient filer2.FilerClient, paren subDir = "/" + entry.Name } jobQueueWg.Add(1) - queue.Enqueue(filer2.FullPath(subDir)) + queue.Enqueue(util.FullPath(subDir)) } }) diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 85275058e..148ac6e2f 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -6,8 +6,8 @@ import ( "io" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -47,9 +47,9 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer return err } - sourceDir, sourceName := filer2.FullPath(sourcePath).DirAndName() + sourceDir, sourceName := util.FullPath(sourcePath).DirAndName() - destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName() + destinationDir, destinationName := util.FullPath(destinationPath).DirAndName() return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { @@ -82,7 +82,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer _, err = client.AtomicRenameEntry(context.Background(), request) - fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, filer2.NewFullPath(targetDir, targetName)) + fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, util.NewFullPath(targetDir, targetName)) return err diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index 04530571c..d1f639cff 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -5,8 +5,8 @@ import ( "io" "strings" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func init() { @@ -34,9 +34,9 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ return err } - dir, name := filer2.FullPath(path).DirAndName() + dir, name := util.FullPath(path).DirAndName() - dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, newPrefix(), -1) + dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(dir), name, newPrefix(), -1) if terr == nil { fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) @@ -46,11 +46,11 @@ func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writ } -func treeTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir filer2.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { +func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir util.FullPath, name string, prefix *Prefix, level int) (directoryCount, fileCount int64, err error) { prefix.addMarker(level) - err = filer2.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) { if level < 0 && name != "" { if entry.Name != name { return diff --git a/weed/shell/commands.go b/weed/shell/commands.go index b8832ad93..7ca631ab3 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -10,8 +10,8 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -66,7 +66,7 @@ func (ce *CommandEnv) isDirectory(filerServer string, filerPort int64, path stri func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path string) error { - dir, name := filer2.FullPath(path).DirAndName() + dir, name := util.FullPath(path).DirAndName() return ce.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { -- cgit v1.2.3 From 7f1e3c843ddc8d6880985f0a32f37cfb19b27c3c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 01:14:21 -0700 Subject: refactoring --- weed/shell/command_bucket_delete.go | 29 +++++-------------- weed/shell/command_bucket_list.go | 56 ++++++++++++++++++------------------- 2 files changed, 34 insertions(+), 51 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go index c57ce7221..509b3c1de 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_bucket_delete.go @@ -1,7 +1,6 @@ package shell import ( - "context" "flag" "fmt" "io" @@ -44,28 +43,14 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i return parseErr } - err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + filerClient := commandEnv.getFilerClient(filerServer, filerPort) - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) - } - filerBucketsPath := resp.DirBuckets - - if _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ - Directory: filerBucketsPath, - Name: *bucketName, - IsDeleteData: false, - IsRecursive: true, - IgnoreRecursiveError: true, - }); err != nil { - return err - } - - return nil - - }) + var filerBucketsPath string + filerBucketsPath, err = readFilerBucketsPath(filerClient) + if err != nil { + return fmt.Errorf("read buckets: %v", err) + } - return err + return filer_pb.Remove(filerClient, filerBucketsPath, *bucketName, false, true, true) } diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go index 5eb5972ce..486d40fba 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_bucket_list.go @@ -39,43 +39,41 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io. return parseErr } - err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + filerClient := commandEnv.getFilerClient(filerServer, filerPort) - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) - } - filerBucketsPath := resp.DirBuckets + var filerBucketsPath string + filerBucketsPath, err = readFilerBucketsPath(filerClient) + if err != nil { + return fmt.Errorf("read buckets: %v", err) + } - stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ - Directory: filerBucketsPath, - Limit: math.MaxUint32, - }) - if err != nil { - return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) + err = filer_pb.List(filerClient, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) { + if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" { + fmt.Fprintf(writer, " %s\n", entry.Name) + } else { + fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication) } + }, "", false, math.MaxUint32) + if err != nil { + return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) + } + + return err - for { - resp, recvErr := stream.Recv() - if recvErr != nil { - if recvErr == io.EOF { - break - } else { - return recvErr - } - } - - if resp.Entry.Attributes.Replication == "" || resp.Entry.Attributes.Replication == "000" { - fmt.Fprintf(writer, " %s\n", resp.Entry.Name) - } else { - fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", resp.Entry.Name, resp.Entry.Attributes.Replication) - } +} + +func readFilerBucketsPath(filerClient filer_pb.FilerClient) (filerBucketsPath string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer configuration: %v", err) } + filerBucketsPath = resp.DirBuckets return nil }) - return err - + return filerBucketsPath, err } -- cgit v1.2.3 From d151185b7e5c7912afc38a36c163ef87eedcde3d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 20:46:17 -0700 Subject: shell: desupport filer url in the arguments --- weed/shell/command_fs_cat.go | 1 - weed/shell/command_fs_cd.go | 6 +++--- weed/shell/command_fs_du.go | 6 +++--- weed/shell/command_fs_ls.go | 3 --- weed/shell/command_fs_meta_cat.go | 2 -- weed/shell/command_fs_tree.go | 3 ++- weed/shell/commands.go | 5 +++-- 7 files changed, 11 insertions(+), 15 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 1479aed95..b1d4eea14 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -25,7 +25,6 @@ func (c *commandFsCat) Help() string { return `stream the file content on to the screen fs.cat /dir/file_name - fs.cat http://:/dir/file_name ` } diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go index df42cd516..377fd40f7 100644 --- a/weed/shell/command_fs_cd.go +++ b/weed/shell/command_fs_cd.go @@ -16,14 +16,14 @@ func (c *commandFsCd) Name() string { } func (c *commandFsCd) Help() string { - return `change directory to http://:/dir/ + return `change directory to a directory /path/to/dir The full path can be too long to type. For example, - fs.ls http://:/some/path/to/file_name + fs.ls /some/path/to/file_name can be simplified as - fs.cd http://:/some/path + fs.cd /some/path fs.ls to/file_name ` } diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index b7313bebe..0372ba95f 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -24,9 +24,9 @@ func (c *commandFsDu) Name() string { func (c *commandFsDu) Help() string { return `show disk usage - fs.du http://:/dir - fs.du http://:/dir/file_name - fs.du http://:/dir/file_prefix + fs.du /dir + fs.du /dir/file_name + fs.du /dir/file_prefix ` } diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 4185d67a8..66adf057e 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -30,9 +30,6 @@ func (c *commandFsLs) Help() string { fs.ls [-l] [-a] /dir/ fs.ls [-l] [-a] /dir/file_name fs.ls [-l] [-a] /dir/file_prefix - fs.ls [-l] [-a] http://:/dir/ - fs.ls [-l] [-a] http://:/dir/file_name - fs.ls [-l] [-a] http://:/dir/file_prefix ` } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 9cbe852c0..cbbca746c 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -26,8 +26,6 @@ func (c *commandFsMetaCat) Help() string { fs.meta.cat /dir/ fs.meta.cat /dir/file_name - fs.meta.cat http://:/dir/ - fs.meta.cat http://:/dir/file_name ` } diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index d1f639cff..0982082db 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -23,7 +23,8 @@ func (c *commandFsTree) Name() string { func (c *commandFsTree) Help() string { return `recursively list all files under a directory - fs.tree http://:/dir/ + fs.tree /some/dir + ` } diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 7ca631ab3..660929ec7 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -50,7 +50,8 @@ func NewCommandEnv(options ShellOptions) *CommandEnv { func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) { if strings.HasPrefix(input, "http") { - return parseFilerUrl(input) + err = fmt.Errorf("http://: prefix is not supported any more") + return } if !strings.HasPrefix(input, "/") { input = filepath.ToSlash(filepath.Join(ce.option.Directory, input)) @@ -101,7 +102,7 @@ func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path } path = u.Path } else { - err = fmt.Errorf("path should have full url http://:/path/to/dirOrFile : %s", entryPath) + err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath) } return } -- cgit v1.2.3 From e666aeece2778812a4a9d3fc4daaaac86fe4a412 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 21:26:15 -0700 Subject: simplify parsing filer host and port --- weed/shell/command_bucket_create.go | 9 ++------- weed/shell/command_bucket_delete.go | 8 +++----- weed/shell/command_bucket_list.go | 8 +++----- weed/shell/command_fs_cat.go | 8 +++----- weed/shell/command_fs_cd.go | 10 ++-------- weed/shell/command_fs_du.go | 28 ++++++---------------------- weed/shell/command_fs_ls.go | 8 +++----- weed/shell/command_fs_meta_cat.go | 6 ++---- weed/shell/command_fs_meta_load.go | 9 ++------- weed/shell/command_fs_meta_notify.go | 4 ++-- weed/shell/command_fs_meta_save.go | 8 ++++---- weed/shell/command_fs_mv.go | 6 +++--- weed/shell/command_fs_tree.go | 4 ++-- weed/shell/commands.go | 27 +++++++-------------------- 14 files changed, 44 insertions(+), 99 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go index 3546528aa..52d96e4c3 100644 --- a/weed/shell/command_bucket_create.go +++ b/weed/shell/command_bucket_create.go @@ -43,16 +43,11 @@ func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer i return fmt.Errorf("empty bucket name") } - filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) - if parseErr != nil { - return parseErr - } - - err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err) + return fmt.Errorf("get filer configuration: %v", err) } filerBucketsPath := resp.DirBuckets diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go index 509b3c1de..07c2e74ac 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_bucket_delete.go @@ -38,19 +38,17 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i return fmt.Errorf("empty bucket name") } - filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) if parseErr != nil { return parseErr } - filerClient := commandEnv.getFilerClient(filerServer, filerPort) - var filerBucketsPath string - filerBucketsPath, err = readFilerBucketsPath(filerClient) + filerBucketsPath, err = readFilerBucketsPath(commandEnv) if err != nil { return fmt.Errorf("read buckets: %v", err) } - return filer_pb.Remove(filerClient, filerBucketsPath, *bucketName, false, true, true) + return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true) } diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go index 486d40fba..b982ff646 100644 --- a/weed/shell/command_bucket_list.go +++ b/weed/shell/command_bucket_list.go @@ -34,20 +34,18 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) + _, parseErr := commandEnv.parseUrl(findInputDirectory(bucketCommand.Args())) if parseErr != nil { return parseErr } - filerClient := commandEnv.getFilerClient(filerServer, filerPort) - var filerBucketsPath string - filerBucketsPath, err = readFilerBucketsPath(filerClient) + filerBucketsPath, err = readFilerBucketsPath(commandEnv) if err != nil { return fmt.Errorf("read buckets: %v", err) } - err = filer_pb.List(filerClient, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) { if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" { fmt.Fprintf(writer, " %s\n", entry.Name) } else { diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index b1d4eea14..7177d8ac3 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -30,20 +30,18 @@ func (c *commandFsCat) Help() string { func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - input := findInputDirectory(args) - - filerServer, filerPort, path, err := commandEnv.parseUrl(input) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } - if commandEnv.isDirectory(filerServer, filerPort, path) { + if commandEnv.isDirectory(path) { return fmt.Errorf("%s is a directory", path) } dir, name := util.FullPath(path).DirAndName() - return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go index 377fd40f7..2cc28f7a2 100644 --- a/weed/shell/command_fs_cd.go +++ b/weed/shell/command_fs_cd.go @@ -30,25 +30,19 @@ func (c *commandFsCd) Help() string { func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - input := findInputDirectory(args) - - filerServer, filerPort, path, err := commandEnv.parseUrl(input) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } if path == "/" { - commandEnv.option.FilerHost = filerServer - commandEnv.option.FilerPort = filerPort commandEnv.option.Directory = "/" return nil } - err = commandEnv.checkDirectory(filerServer, filerPort, path) + err = commandEnv.checkDirectory(path) if err == nil { - commandEnv.option.FilerHost = filerServer - commandEnv.option.FilerPort = filerPort commandEnv.option.Directory = path } diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 0372ba95f..f3d479614 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -32,18 +32,18 @@ func (c *commandFsDu) Help() string { func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } - if commandEnv.isDirectory(filerServer, filerPort, path) { + if commandEnv.isDirectory(path) { path = path + "/" } var blockCount, byteCount uint64 dir, name := util.FullPath(path).DirAndName() - blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name) + blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv, dir, name) if name == "" && err == nil { fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir) @@ -78,29 +78,13 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir return } -func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { +func (env *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) + filerGrpcAddress := fmt.Sprintf("%s:%d", env.option.FilerHost, env.option.FilerPort+10000) return pb.WithGrpcFilerClient(filerGrpcAddress, env.option.GrpcDialOption, fn) } -type commandFilerClient struct { - env *CommandEnv - filerServer string - filerPort int64 -} - -func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *commandFilerClient { - return &commandFilerClient{ - env: env, - filerServer: filerServer, - filerPort: filerPort, - } -} -func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return c.env.withFilerClient(c.filerServer, c.filerPort, fn) -} -func (c *commandFilerClient) AdjustedUrl(hostAndPort string) string { +func (env *CommandEnv) AdjustedUrl(hostAndPort string) string { return hostAndPort } diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go index 66adf057e..be531e980 100644 --- a/weed/shell/command_fs_ls.go +++ b/weed/shell/command_fs_ls.go @@ -50,21 +50,19 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer } } - input := findInputDirectory(args) - - filerServer, filerPort, path, err := commandEnv.parseUrl(input) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } - if commandEnv.isDirectory(filerServer, filerPort, path) { + if commandEnv.isDirectory(path) { path = path + "/" } dir, name := util.FullPath(path).DirAndName() entryCount := 0 - err = filer_pb.ReadDirAllEntries(commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { if !showHidden && strings.HasPrefix(entry.Name, ".") { return diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index cbbca746c..0679ec075 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -31,16 +31,14 @@ func (c *commandFsMetaCat) Help() string { func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - input := findInputDirectory(args) - - filerServer, filerPort, path, err := commandEnv.parseUrl(input) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } dir, name := util.FullPath(path).DirAndName() - return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index a19e9d3ce..69ae9454c 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -37,11 +37,6 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(nil)) - if err != nil { - return err - } - fileName := args[len(args)-1] dst, err := os.OpenFile(fileName, os.O_RDONLY, 0644) @@ -52,7 +47,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) @@ -98,7 +93,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. if err == nil { fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) - fmt.Fprintf(writer, "\n%s is loaded to http://%s:%d%s\n", fileName, filerServer, filerPort, path) + fmt.Fprintf(writer, "\n%s is loaded.\n", fileName) } return err diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index 995ea16a2..b361b61a6 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -32,7 +32,7 @@ func (c *commandFsMetaNotify) Help() string { func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } @@ -43,7 +43,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + err = doTraverseBFS(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { if entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 4314542bd..a07a94ccb 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -52,7 +52,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) if parseErr != nil { return parseErr } @@ -61,7 +61,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. fileName := *outputFileName if fileName == "" { fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) } dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) @@ -85,7 +85,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + err = doTraverseBFS(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), @@ -118,7 +118,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. if err == nil { fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) - fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + fmt.Fprintf(writer, "meta data for %s is saved to %s\n", path, fileName) } return err diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 148ac6e2f..c7c0984fc 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -37,12 +37,12 @@ func (c *commandFsMv) Help() string { func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - filerServer, filerPort, sourcePath, err := commandEnv.parseUrl(args[0]) + sourcePath, err := commandEnv.parseUrl(args[0]) if err != nil { return err } - _, _, destinationPath, err := commandEnv.parseUrl(args[1]) + destinationPath, err := commandEnv.parseUrl(args[1]) if err != nil { return err } @@ -51,7 +51,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer destinationDir, destinationName := util.FullPath(destinationPath).DirAndName() - return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go index 0982082db..b0752ea03 100644 --- a/weed/shell/command_fs_tree.go +++ b/weed/shell/command_fs_tree.go @@ -30,14 +30,14 @@ func (c *commandFsTree) Help() string { func (c *commandFsTree) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args)) + path, err := commandEnv.parseUrl(findInputDirectory(args)) if err != nil { return err } dir, name := util.FullPath(path).DirAndName() - dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), util.FullPath(dir), name, newPrefix(), -1) + dirCount, fCount, terr := treeTraverseDirectory(writer, commandEnv, util.FullPath(dir), name, newPrefix(), -1) if terr == nil { fmt.Fprintf(writer, "%d directories, %d files\n", dirCount, fCount) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 660929ec7..7e240de5f 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -48,7 +48,7 @@ func NewCommandEnv(options ShellOptions) *CommandEnv { } } -func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int64, path string, err error) { +func (ce *CommandEnv) parseUrl(input string) (path string, err error) { if strings.HasPrefix(input, "http") { err = fmt.Errorf("http://: prefix is not supported any more") return @@ -56,35 +56,22 @@ func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int6 if !strings.HasPrefix(input, "/") { input = filepath.ToSlash(filepath.Join(ce.option.Directory, input)) } - return ce.option.FilerHost, ce.option.FilerPort, input, err + return input, err } -func (ce *CommandEnv) isDirectory(filerServer string, filerPort int64, path string) bool { +func (ce *CommandEnv) isDirectory(path string) bool { - return ce.checkDirectory(filerServer, filerPort, path) == nil + return ce.checkDirectory(path) == nil } -func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path string) error { +func (ce *CommandEnv) checkDirectory(path string) error { dir, name := util.FullPath(path).DirAndName() - return ce.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + _, err := filer_pb.Exists(ce, dir, name, true) - resp, lookupErr := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir, - Name: name, - }) - if lookupErr != nil { - return lookupErr - } - - if !resp.Entry.IsDirectory { - return fmt.Errorf("not a directory") - } - - return nil - }) + return err } -- cgit v1.2.3 From b51fa81f0e7cd7ab21f3683f356b0380f9dce75b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 21:36:39 -0700 Subject: fix directory checking --- weed/shell/commands.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 7e240de5f..6e40380e0 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -69,7 +69,11 @@ func (ce *CommandEnv) checkDirectory(path string) error { dir, name := util.FullPath(path).DirAndName() - _, err := filer_pb.Exists(ce, dir, name, true) + exists, err := filer_pb.Exists(ce, dir, name, true) + + if !exists { + return fmt.Errorf("%s is not a directory", path) + } return err -- cgit v1.2.3 From 38e73463f1c8c7d7dc226ea41679407cb66101d4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 21:37:04 -0700 Subject: fix du block and byte couting --- weed/shell/command_fs_du.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index f3d479614..dc5c3dec0 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -56,6 +56,9 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) { err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) { + + var fileBlockCount, fileByteCount uint64 + if entry.IsDirectory { subDir := fmt.Sprintf("%s/%s", dir, entry.Name) if dir == "/" { @@ -67,12 +70,14 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir byteCount += numByte } } else { + fileBlockCount = uint64(len(entry.Chunks)) + fileByteCount = filer2.TotalSize(entry.Chunks) blockCount += uint64(len(entry.Chunks)) byteCount += filer2.TotalSize(entry.Chunks) } if name != "" && !entry.IsDirectory { - fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name) + fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name) } }) return -- cgit v1.2.3 From 782d776d2a63c8c403f7fa52e97154f146467610 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 22:54:02 -0700 Subject: refactoring --- weed/shell/command_fs_du.go | 12 ------------ weed/shell/commands.go | 13 +++++++++++++ 2 files changed, 13 insertions(+), 12 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index dc5c3dec0..08c553e7c 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -5,7 +5,6 @@ import ( "io" "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -82,14 +81,3 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir }) return } - -func (env *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - - filerGrpcAddress := fmt.Sprintf("%s:%d", env.option.FilerHost, env.option.FilerPort+10000) - return pb.WithGrpcFilerClient(filerGrpcAddress, env.option.GrpcDialOption, fn) - -} - -func (env *CommandEnv) AdjustedUrl(hostAndPort string) string { - return hostAndPort -} diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 6e40380e0..31136951e 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" @@ -79,6 +80,18 @@ func (ce *CommandEnv) checkDirectory(path string) error { } +func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + filerGrpcAddress := fmt.Sprintf("%s:%d", ce.option.FilerHost, ce.option.FilerPort+10000) + return pb.WithGrpcFilerClient(filerGrpcAddress, ce.option.GrpcDialOption, fn) + +} + +func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} + + func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { if strings.HasPrefix(entryPath, "http") { var u *url.URL -- cgit v1.2.3 From 8047ec2f5103696982acd6c6407ce77c1b02b89e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 22:54:46 -0700 Subject: shell: fs.meta.save add option to export all fileIds for all files --- weed/shell/command_fs_meta_notify.go | 2 +- weed/shell/command_fs_meta_save.go | 78 ++++++++++++++++++++++++++---------- 2 files changed, 58 insertions(+), 22 deletions(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index b361b61a6..56e63e98f 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -43,7 +43,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = doTraverseBFS(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + err = doTraverseBfs(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { if entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index a07a94ccb..5ea69026f 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -48,6 +48,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) verbose := fsMetaSaveCommand.Bool("v", false, "print out each processed files") outputFileName := fsMetaSaveCommand.String("o", "", "output the meta data to this file") + chunksFileName := fsMetaSaveCommand.String("chunks", "", "output all the chunks to this file") if err = fsMetaSaveCommand.Parse(args); err != nil { return nil } @@ -57,13 +58,58 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return parseErr } - t := time.Now() - fileName := *outputFileName - if fileName == "" { - fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + if *outputFileName != "" { + fileName := *outputFileName + if fileName == "" { + t := time.Now() + fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", + commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + } + return doTraverseBfsAndSaving(fileName, commandEnv, writer, path, *verbose, func(dst io.Writer, outputChan chan []byte) { + sizeBuf := make([]byte, 4) + for b := range outputChan { + util.Uint32toBytes(sizeBuf, uint32(len(b))) + dst.Write(sizeBuf) + dst.Write(b) + } + }, func(entry *filer_pb.FullEntry, outputChan chan []byte) (err error) { + bytes, err := proto.Marshal(entry) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } + + outputChan <- bytes + return nil + }) + } + + if *chunksFileName != "" { + return doTraverseBfsAndSaving(*chunksFileName, commandEnv, writer, path, *verbose, func(dst io.Writer, outputChan chan []byte) { + for b := range outputChan { + dst.Write(b) + } + }, func(entry *filer_pb.FullEntry, outputChan chan []byte) (err error) { + for _, chunk := range entry.Entry.Chunks { + dir := entry.Dir + if dir == "/" { + dir = "" + } + outputLine := fmt.Sprintf("%d\t%s\t%s/%s\n", chunk.Fid.FileKey, chunk.FileId, dir, entry.Entry.Name) + outputChan <- []byte(outputLine) + } + return nil + }) } + return err + +} + +func doTraverseBfsAndSaving(fileName string, commandEnv *CommandEnv, writer io.Writer, path string, verbose bool, + saveFn func(dst io.Writer, outputChan chan []byte), + genFn func(entry *filer_pb.FullEntry, outputChan chan []byte) error) error { + dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if openErr != nil { return fmt.Errorf("failed to create file %s: %v", fileName, openErr) @@ -74,39 +120,31 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. wg.Add(1) outputChan := make(chan []byte, 1024) go func() { - sizeBuf := make([]byte, 4) - for b := range outputChan { - util.Uint32toBytes(sizeBuf, uint32(len(b))) - dst.Write(sizeBuf) - dst.Write(b) - } + saveFn(dst, outputChan) wg.Done() }() var dirCount, fileCount uint64 - err = doTraverseBFS(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { + err := doTraverseBfs(writer, commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), Entry: entry, } - bytes, err := proto.Marshal(protoMessage) - if err != nil { + if err := genFn(protoMessage, outputChan); err != nil { fmt.Fprintf(writer, "marshall error: %v\n", err) return } - outputChan <- bytes - if entry.IsDirectory { atomic.AddUint64(&dirCount, 1) } else { atomic.AddUint64(&fileCount, 1) } - if *verbose { + if verbose { println(parentPath.Child(entry.Name)) } @@ -118,13 +156,11 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. if err == nil { fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) - fmt.Fprintf(writer, "meta data for %s is saved to %s\n", path, fileName) } - return err - } -func doTraverseBFS(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) { + +func doTraverseBfs(writer io.Writer, filerClient filer_pb.FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *filer_pb.Entry)) (err error) { K := 5 -- cgit v1.2.3 From bb9b97e2b4ad7b826b21b03a648527e32bc744f2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Mar 2020 23:07:11 -0700 Subject: add comments --- weed/shell/command_fs_meta_save.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'weed/shell') diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index 5ea69026f..f1628973a 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -38,7 +38,14 @@ func (c *commandFsMetaSave) Help() string { The meta data will be saved into a local --