diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-12-02 15:08:28 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-12-02 15:08:31 -0800 |
| commit | ec8de250e28b3356bb29b273baa557852f3c000b (patch) | |
| tree | 5e0c32e118489ad181ef370cbd6e6ac6fe4bcefb /weed/shell | |
| parent | 228231f3d75c23a1f442d4502d5e7008aa58424d (diff) | |
| download | seaweedfs-ec8de250e28b3356bb29b273baa557852f3c000b.tar.xz seaweedfs-ec8de250e28b3356bb29b273baa557852f3c000b.zip | |
tiered storage: can copy to s3, read from s3
master not aware tiered volume yet, file assigning is not working yet
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_volume_tier.go | 68 |
1 files changed, 53 insertions, 15 deletions
diff --git a/weed/shell/command_volume_tier.go b/weed/shell/command_volume_tier.go index 177d3f30d..9047d1199 100644 --- a/weed/shell/command_volume_tier.go +++ b/weed/shell/command_volume_tier.go @@ -25,14 +25,30 @@ func (c *commandVolumeTier) Name() string { } func (c *commandVolumeTier) Help() string { - return `copy the dat file of a volume to a remote tier + return `move the dat file of a volume to a remote tier - ec.encode [-collection=""] [-fullPercent=95] [-quietFor=1h] - ec.encode [-collection=""] [-volumeId=<volume_id>] + volume.tier [-collection=""] [-fullPercent=95] [-quietFor=1h] + volume.tier [-collection=""] -volumeId=<volume_id> -dest=<storage_backend> [-keepLocalDatFile] - This command will: - 1. freeze one volume - 2. copy the dat file of a volume to a remote tier + e.g.: + volume.tier -volumeId=7 -dest=s3 + volume.tier -volumeId=7 -dest=s3.default + + The <storage_backend> is defined in master.toml. + For example, "s3.default" in [storage.backend.s3.default] + + This command will move the dat file of a volume to a remote tier. + + SeaweedFS enables scalable and fast local access to lots of files, + and the cloud storage is slower by cost efficient. How to combine them together? + + Usually the data follows 80/20 rule: only 20% of data is frequently accessed. + We can offload the old volumes to the cloud. + + With this, SeaweedFS can be both fast and scalable, and infinite storage space. + Just add more local SeaweedFS volume servers to increase the throughput. + + The index file is still local, and the same O(1) disk read is applied to the remote file. ` } @@ -44,7 +60,8 @@ func (c *commandVolumeTier) Do(args []string, commandEnv *CommandEnv, writer io. collection := tierCommand.String("collection", "", "the collection name") fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") - dest := tierCommand.String("destination", "", "the target tier name") + dest := tierCommand.String("dest", "", "the target tier name") + keepLocalDatFile := tierCommand.Bool("keepLocalDatFile", false, "whether keep local dat file") if err = tierCommand.Parse(args); err != nil { return nil } @@ -54,7 +71,7 @@ func (c *commandVolumeTier) Do(args []string, commandEnv *CommandEnv, writer io. // volumeId is provided if vid != 0 { - return doVolumeTier(ctx, commandEnv, *collection, vid, *dest) + return doVolumeTier(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) } // apply to all volumes in the collection @@ -65,7 +82,7 @@ func (c *commandVolumeTier) Do(args []string, commandEnv *CommandEnv, writer io. } fmt.Printf("tiering volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doVolumeTier(ctx, commandEnv, *collection, vid, *dest); err != nil { + if err = doVolumeTier(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil { return err } } @@ -73,7 +90,7 @@ func (c *commandVolumeTier) Do(args []string, commandEnv *CommandEnv, writer io. return nil } -func doVolumeTier(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, dest string) (err error) { +func doVolumeTier(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -89,7 +106,7 @@ func doVolumeTier(ctx context.Context, commandEnv *CommandEnv, collection string */ // copy the .dat file to remote tier - err = copyDatToRemoteTier(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url, dest) + err = copyDatToRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile) if err != nil { return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err) } @@ -97,13 +114,34 @@ func doVolumeTier(ctx context.Context, commandEnv *CommandEnv, collection string return nil } -func copyDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string) error { +func copyDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, copyErr := volumeServerClient.VolumeTierCopyDatToRemote(ctx, &volume_server_pb.VolumeTierCopyDatToRemoteRequest{ - VolumeId: uint32(volumeId), - Collection: collection, + stream, copyErr := volumeServerClient.VolumeTierCopyDatToRemote(ctx, &volume_server_pb.VolumeTierCopyDatToRemoteRequest{ + VolumeId: uint32(volumeId), + Collection: collection, + DestinationBackendName: dest, + KeepLocalDatFile: keepLocalDatFile, }) + + var lastProcessed int64 + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + + processingSpeed := float64(resp.Processed - lastProcessed)/1024.0/1024.0 + + fmt.Fprintf(writer, "copied %.2f%%, %d bytes, %.2fMB/s\n", resp.ProcessedPercentage, resp.Processed, processingSpeed) + + lastProcessed = resp.Processed + } + return copyErr }) |
