aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go7
-rw-r--r--weed/server/volume_grpc_tier.go75
2 files changed, 67 insertions, 15 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index e3e16b927..ba5562dfe 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -11,15 +11,12 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "google.golang.org/grpc"
-
- _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
_ "github.com/chrislusf/seaweedfs/weed/statik"
"github.com/gorilla/mux"
diff --git a/weed/server/volume_grpc_tier.go b/weed/server/volume_grpc_tier.go
index cbd344ed0..d3cdd62f2 100644
--- a/weed/server/volume_grpc_tier.go
+++ b/weed/server/volume_grpc_tier.go
@@ -1,9 +1,9 @@
package weed_server
import (
- "context"
"fmt"
"os"
+ "time"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
@@ -11,28 +11,83 @@ import (
)
// VolumeTierCopyDatToRemote copy dat file to a remote tier
-func (vs *VolumeServer) VolumeTierCopyDatToRemote(ctx context.Context, req *volume_server_pb.VolumeTierCopyDatToRemoteRequest) (*volume_server_pb.VolumeTierCopyDatToRemoteResponse, error) {
+func (vs *VolumeServer) VolumeTierCopyDatToRemote(req *volume_server_pb.VolumeTierCopyDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierCopyDatToRemoteServer) error {
+ // find existing volume
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
- return nil, fmt.Errorf("volume %d not found", req.VolumeId)
+ return fmt.Errorf("volume %d not found", req.VolumeId)
}
+ // verify the collection
if v.Collection != req.Collection {
- return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ // locate the disk file
diskFile, ok := v.DataBackend.(*backend.DiskFile)
if !ok {
- return nil, fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
}
- err := uploadFileToRemote(ctx, req, diskFile.File)
- return &volume_server_pb.VolumeTierCopyDatToRemoteResponse{}, err
-}
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
+ }
+
+ // check whether the existing backend storage is the same as requested
+ // if same, skip
+ backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
+ for _, remoteFile := range v.GetVolumeTierInfo().GetFiles() {
+ if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
+ return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
+ }
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierCopyDatToRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+ // copy the data file
+ key, size, err := backendStorage.CopyFile(diskFile.File, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.String(), err)
+ }
-func uploadFileToRemote(ctx context.Context, req *volume_server_pb.VolumeTierCopyDatToRemoteRequest, f *os.File) error {
- println("copying dat file of", f.Name(), "to remote")
+ // save the remote file to volume tier info
+ v.GetVolumeTierInfo().Files = append(v.GetVolumeTierInfo().GetFiles(), &volume_server_pb.RemoteFile{
+ BackendType: backendType,
+ BackendId: backendId,
+ Key: key,
+ Offset: 0,
+ FileSize: uint64(size),
+ ModifiedTime: uint64(time.Now().Unix()),
+ })
+
+ if err := v.SaveVolumeTierInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ if err := v.LoadRemoteFile(); err != nil {
+ return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
+ }
+
+ if !req.KeepLocalDatFile {
+ os.Remove(v.FileName() + ".dat")
+ }
return nil
}