diff options
Diffstat (limited to 'weed/shell/command_volume_tier_download.go')
| -rw-r--r-- | weed/shell/command_volume_tier_download.go | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go new file mode 100644 index 000000000..756dc4686 --- /dev/null +++ b/weed/shell/command_volume_tier_download.go @@ -0,0 +1,166 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeTierDownload{}) +} + +type commandVolumeTierDownload struct { +} + +func (c *commandVolumeTierDownload) Name() string { + return "volume.tier.download" +} + +func (c *commandVolumeTierDownload) Help() string { + return `download the dat file of a volume from a remote tier + + volume.tier.download [-collection=""] + volume.tier.download [-collection=""] -volumeId=<volume_id> + + e.g.: + volume.tier.download -volumeId=7 + volume.tier.download -volumeId=7 + + This command will download the dat file of a volume from a remote tier to a volume server in local cluster. + +` +} + +func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := tierCommand.Int("volumeId", 0, "the volume id") + collection := tierCommand.String("collection", "", "the collection name") + if err = tierCommand.Parse(args); err != nil { + return nil + } + + vid := needle.VolumeId(*volumeId) + + // collect topology information + topologyInfo, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } + + // volumeId is provided + if vid != 0 { + return doVolumeTierDownload(commandEnv, writer, *collection, vid) + } + + // apply to all volumes in the collection + // reusing collectVolumeIdsForEcEncode for now + volumeIds := collectRemoteVolumes(topologyInfo, *collection) + if err != nil { + return err + } + fmt.Printf("tier download volumes: %v\n", volumeIds) + for _, vid := range volumeIds { + if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil { + return err + } + } + + return nil +} + +func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) { + + vidMap := make(map[uint32]bool) + eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, v := range dn.VolumeInfos { + if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { + vidMap[v.Id] = true + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} + +func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) { + // find volume location + locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) + if !found { + return fmt.Errorf("volume %d not found", vid) + } + + // TODO parallelize this + for _, loc := range locations { + // copy the .dat file from remote tier to local + err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url) + if err != nil { + return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err) + } + } + + return nil +} + +func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { + + err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + }) + + var lastProcessed int64 + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + processingSpeed := float64(resp.Processed-lastProcessed) / 1024.0 / 1024.0 + + fmt.Fprintf(writer, "downloaded %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed) + + lastProcessed = resp.Processed + } + if downloadErr != nil { + return downloadErr + } + + _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{ + VolumeId: uint32(volumeId), + }) + if unmountErr != nil { + return unmountErr + } + + _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{ + VolumeId: uint32(volumeId), + }) + if mountErr != nil { + return mountErr + } + + return nil + }) + + return err + +} |
