diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tier.go | 75 |
2 files changed, 67 insertions, 15 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index e3e16b927..ba5562dfe 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -11,15 +11,12 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" - - _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" _ "github.com/chrislusf/seaweedfs/weed/statik" "github.com/gorilla/mux" diff --git a/weed/server/volume_grpc_tier.go b/weed/server/volume_grpc_tier.go index cbd344ed0..d3cdd62f2 100644 --- a/weed/server/volume_grpc_tier.go +++ b/weed/server/volume_grpc_tier.go @@ -1,9 +1,9 @@ package weed_server import ( - "context" "fmt" "os" + "time" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" @@ -11,28 +11,83 @@ import ( ) // VolumeTierCopyDatToRemote copy dat file to a remote tier -func (vs *VolumeServer) VolumeTierCopyDatToRemote(ctx context.Context, req *volume_server_pb.VolumeTierCopyDatToRemoteRequest) (*volume_server_pb.VolumeTierCopyDatToRemoteResponse, error) { +func (vs *VolumeServer) VolumeTierCopyDatToRemote(req *volume_server_pb.VolumeTierCopyDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierCopyDatToRemoteServer) error { + // find existing volume v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { - return nil, fmt.Errorf("volume %d not found", req.VolumeId) + return fmt.Errorf("volume %d not found", req.VolumeId) } + // verify the collection if v.Collection != req.Collection { - return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + // locate the disk file diskFile, ok := v.DataBackend.(*backend.DiskFile) if !ok { - return nil, fmt.Errorf("volume %d is not on local disk", req.VolumeId) + return fmt.Errorf("volume %d is not on local disk", req.VolumeId) } - err := uploadFileToRemote(ctx, req, diskFile.File) - return &volume_server_pb.VolumeTierCopyDatToRemoteResponse{}, err -} + // check valid storage backend type + backendStorage, found := backend.BackendStorages[req.DestinationBackendName] + if !found { + var keys []string + for key := range backend.BackendStorages { + keys = append(keys, key) + } + return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys) + } + + // check whether the existing backend storage is the same as requested + // if same, skip + backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName) + for _, remoteFile := range v.GetVolumeTierInfo().GetFiles() { + if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId { + return fmt.Errorf("destination %s already exists", req.DestinationBackendName) + } + } + + startTime := time.Now() + fn := func(progressed int64, percentage float32) error { + now := time.Now() + if now.Sub(startTime) < time.Second { + return nil + } + startTime = now + return stream.Send(&volume_server_pb.VolumeTierCopyDatToRemoteResponse{ + Processed: progressed, + ProcessedPercentage: percentage, + }) + } + // copy the data file + key, size, err := backendStorage.CopyFile(diskFile.File, fn) + if err != nil { + return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.String(), err) + } -func uploadFileToRemote(ctx context.Context, req *volume_server_pb.VolumeTierCopyDatToRemoteRequest, f *os.File) error { - println("copying dat file of", f.Name(), "to remote") + // save the remote file to volume tier info + v.GetVolumeTierInfo().Files = append(v.GetVolumeTierInfo().GetFiles(), &volume_server_pb.RemoteFile{ + BackendType: backendType, + BackendId: backendId, + Key: key, + Offset: 0, + FileSize: uint64(size), + ModifiedTime: uint64(time.Now().Unix()), + }) + + if err := v.SaveVolumeTierInfo(); err != nil { + return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err) + } + + if err := v.LoadRemoteFile(); err != nil { + return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err) + } + + if !req.KeepLocalDatFile { + os.Remove(v.FileName() + ".dat") + } return nil } |
