From 09ca936c78c1044412f47dc06caff1bf08273e60 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Dec 2019 12:48:20 -0800 Subject: shell: add ec.decode command --- weed/shell/command_ec_decode.go | 263 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 weed/shell/command_ec_decode.go (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go new file mode 100644 index 000000000..5ea359b2b --- /dev/null +++ b/weed/shell/command_ec_decode.go @@ -0,0 +1,263 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandEcDecode{}) +} + +type commandEcDecode struct { +} + +func (c *commandEcDecode) Name() string { + return "ec.decode" +} + +func (c *commandEcDecode) Help() string { + return `decode a erasure coded volume into a normal volume + + ec.decode [-collection=""] [-volumeId=] + +` +} + +func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := encodeCommand.Int("volumeId", 0, "the volume id") + collection := encodeCommand.String("collection", "", "the collection name") + if err = encodeCommand.Parse(args); err != nil { + return nil + } + + ctx := context.Background() + vid := needle.VolumeId(*volumeId) + + // collect topology information + topologyInfo, err := collectTopologyInfoForEcDecode(ctx, commandEnv) + if err != nil { + return err + } + + // volumeId is provided + if vid != 0 { + return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid) + } + + // apply to all volumes in the collection + volumeIds := collectEcShardIds(topologyInfo, *collection) + fmt.Printf("ec encode volumes: %v\n", volumeIds) + for _, vid := range volumeIds { + if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { + // find volume location + nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) + + fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) + + // collect ec shards to the server with most space + targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid) + if err != nil { + return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) + } + + // generate a normal volume + err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) + if err != nil { + return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) + } + + // delete the previous ec shards + err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) + if err != nil { + return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) + } + + return nil +} + +func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { + + // mount volume + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ + VolumeId: uint32(vid), + }) + return mountErr + }); err != nil { + return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) + } + + // unmount ec shards + for location, ecIndexBits := range nodeToEcIndexBits { + fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) + if err != nil { + return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) + } + } + // delete ec shards + for location, ecIndexBits := range nodeToEcIndexBits { + fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) + if err != nil { + return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) + } + } + + return nil +} + +func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { + + fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) + + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + VolumeId: uint32(vid), + Collection: collection, + }) + return genErr + }) + + return err + +} + +func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { + + maxShardCount := 0 + var exisitngEcIndexBits erasure_coding.ShardBits + for loc, ecIndexBits := range nodeToEcIndexBits { + if ecIndexBits.ShardIdCount() > maxShardCount { + maxShardCount = ecIndexBits.ShardIdCount() + targetNodeLocation = loc + exisitngEcIndexBits = ecIndexBits + } + } + + fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits) + + var copiedEcIndexBits erasure_coding.ShardBits + for loc, ecIndexBits := range nodeToEcIndexBits { + if loc == targetNodeLocation { + continue + } + + needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits) + if needToCopyEcIndexBits.ShardIdCount() == 0 { + continue + } + + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) + + _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + VolumeId: uint32(vid), + Collection: collection, + ShardIds: needToCopyEcIndexBits.ToUint32Slice(), + CopyEcxFile: false, + CopyEcjFile: true, + SourceDataNode: loc, + }) + if copyErr != nil { + return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr) + } + + return nil + }) + + if err != nil { + break + } + + copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits) + + } + + nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits) + + return targetNodeLocation, err + +} + +func collectTopologyInfoForEcDecode(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + return resp.TopologyInfo, nil + +} + +func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) { + + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.EcShardInfos { + if v.Collection == selectedCollection && v.Id == uint32(vid) { + ecShardInfos = append(ecShardInfos, v) + } + } + }) + + return +} + +func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { + + vidMap := make(map[uint32]bool) + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.EcShardInfos { + if v.Collection == selectedCollection { + vidMap[v.Id] = true + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} + +func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits { + + nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits) + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.EcShardInfos { + if v.Id == uint32(vid) { + nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + } + } + }) + + return nodeToEcIndexBits +} -- cgit v1.2.3 From a18f62bbe7e5e477fd891849b44ec5a25d8988d2 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 23 Dec 2019 18:06:13 -0800 Subject: only copy required shards --- weed/shell/command_ec_decode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 5ea359b2b..4ec1a7e8f 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -164,7 +164,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits) + needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards() if needToCopyEcIndexBits.ShardIdCount() == 0 { continue } -- cgit v1.2.3 From 1ad34a2487a3e8625f5fea82e38431d0e54794b6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 24 Dec 2019 00:00:45 -0800 Subject: ed.decode prefers servers with most data shards --- weed/shell/command_ec_decode.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 4ec1a7e8f..b49ab2c58 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -149,8 +149,9 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits for loc, ecIndexBits := range nodeToEcIndexBits { - if ecIndexBits.ShardIdCount() > maxShardCount { - maxShardCount = ecIndexBits.ShardIdCount() + toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount() + if toBeCopiedShardCount > maxShardCount { + maxShardCount = toBeCopiedShardCount targetNodeLocation = loc exisitngEcIndexBits = ecIndexBits } -- cgit v1.2.3 From d960b3474af6956ffcf59e782789001169531db8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 25 Dec 2019 09:53:13 -0800 Subject: tier storage: support downloading the remote dat files --- weed/shell/command_ec_decode.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index b49ab2c58..8ca035a8c 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -47,7 +47,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfoForEcDecode(ctx, commandEnv) + topologyInfo, err := collectTopologyInfo(ctx, commandEnv) if err != nil { return err } @@ -203,7 +203,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB } -func collectTopologyInfoForEcDecode(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { +func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { -- cgit v1.2.3 From 37b64a50b4da90637f3f17a2b4bd79f55cbd3098 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Dec 2019 12:44:59 -0800 Subject: ec: generate and copy .vif file --- weed/shell/command_ec_decode.go | 1 + 1 file changed, 1 insertion(+) (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 8ca035a8c..1f9ad2ff9 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -180,6 +180,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB ShardIds: needToCopyEcIndexBits.ToUint32Slice(), CopyEcxFile: false, CopyEcjFile: true, + CopyVifFile: true, SourceDataNode: loc, }) if copyErr != nil { -- cgit v1.2.3 From 72a64a5cf8c2a5adfe59665a746e013ca948e681 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 26 Jan 2020 14:42:11 -0800 Subject: use the same context object in order to retry --- weed/shell/command_ec_decode.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 1f9ad2ff9..8a705a5ae 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -99,7 +99,7 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -132,7 +132,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -170,7 +170,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) -- cgit v1.2.3 From 892e726eb9c2427634c46f8ae9b7bcf0b6d1b082 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 25 Feb 2020 21:50:12 -0800 Subject: avoid reusing context object fix https://github.com/chrislusf/seaweedfs/issues/1182 --- weed/shell/command_ec_decode.go | 43 ++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 22 deletions(-) (limited to 'weed/shell/command_ec_decode.go') diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 8a705a5ae..b69e403cb 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -43,25 +43,24 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // collect topology information - topologyInfo, err := collectTopologyInfo(ctx, commandEnv) + topologyInfo, err := collectTopologyInfo(commandEnv) if err != nil { return err } // volumeId is provided if vid != 0 { - return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid) + return doEcDecode(commandEnv, topologyInfo, *collection, vid) } // apply to all volumes in the collection volumeIds := collectEcShardIds(topologyInfo, *collection) fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil { + if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil { return err } } @@ -69,26 +68,26 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { +func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) { // find volume location nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid) fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits) // collect ec shards to the server with most space - targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid) + targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid) if err != nil { return fmt.Errorf("collectEcShards for volume %d: %v", vid, err) } // generate a normal volume - err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) + err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation) if err != nil { return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } // delete the previous ec shards - err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) + err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid) if err != nil { return fmt.Errorf("delete ec shards for volume %d: %v", vid, err) } @@ -96,11 +95,11 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb return nil } -func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { +func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) return mountErr @@ -111,7 +110,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO // unmount ec shards for location, ecIndexBits := range nodeToEcIndexBits { fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) + err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) if err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) } @@ -119,7 +118,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO // delete ec shards for location, ecIndexBits := range nodeToEcIndexBits { fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) + err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) if err != nil { return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) } @@ -128,12 +127,12 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO return nil } -func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error { fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, }) @@ -144,7 +143,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v } -func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { +func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) { maxShardCount := 0 var exisitngEcIndexBits erasure_coding.ShardBits @@ -170,11 +169,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(vid), Collection: collection, ShardIds: needToCopyEcIndexBits.ToUint32Slice(), @@ -204,11 +203,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB } -func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { +func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { -- cgit v1.2.3