diff options
Diffstat (limited to 'weed/server/volume_grpc_tier_download.go')
| -rw-r--r-- | weed/server/volume_grpc_tier_download.go | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go new file mode 100644 index 000000000..7b3982e40 --- /dev/null +++ b/weed/server/volume_grpc_tier_download.go @@ -0,0 +1,85 @@ +package weed_server + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +// VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server +func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error { + + // find existing volume + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("volume %d not found", req.VolumeId) + } + + // verify the collection + if v.Collection != req.Collection { + return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + + // locate the disk file + storageName, storageKey := v.RemoteStorageNameKey() + if storageName == "" || storageKey == "" { + return fmt.Errorf("volume %d is already on local disk", req.VolumeId) + } + + // check whether the local .dat already exists + _, ok := v.DataBackend.(*backend.DiskFile) + if ok { + return fmt.Errorf("volume %d is already on local disk", req.VolumeId) + } + + // check valid storage backend type + backendStorage, found := backend.BackendStorages[storageName] + if !found { + var keys []string + for key := range backend.BackendStorages { + keys = append(keys, key) + } + return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys) + } + + startTime := time.Now() + fn := func(progressed int64, percentage float32) error { + now := time.Now() + if now.Sub(startTime) < time.Second { + return nil + } + startTime = now + return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{ + Processed: progressed, + ProcessedPercentage: percentage, + }) + } + // copy the data file + _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn) + if err != nil { + return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err) + } + + if req.KeepRemoteDatFile { + return nil + } + + // remove remote file + if err := backendStorage.DeleteFile(storageKey); err != nil { + return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err) + } + + // forget remote file + v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:] + if err := v.SaveVolumeInfo(); err != nil { + return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err) + } + + v.DataBackend.Close() + v.DataBackend = nil + + return nil +} |
