aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go189
1 files changed, 99 insertions, 90 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 818ae5f23..f20ae99bf 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -3,11 +3,8 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
"io"
- "io/ioutil"
"net/http"
- "net/url"
"os"
"path/filepath"
"strconv"
@@ -17,14 +14,14 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
-
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -74,7 +71,7 @@ var cmdFilerCopy = &Command{
It can copy one or a list of files or folders.
If copying a whole folder recursively:
- All files under the folder and subfolders will be copyed.
+ All files under the folder and sub folders will be copied.
Optional parameter "-include" allows you to specify the file name patterns.
If "maxMB" is set to a positive number, files larger than it would be split into chunks.
@@ -92,35 +89,21 @@ func runCopy(cmd *Command, args []string) bool {
filerDestination := args[len(args)-1]
fileOrDirs := args[0 : len(args)-1]
- filerUrl, err := url.Parse(filerDestination)
+ filerAddress, urlPath, err := pb.ParseUrl(filerDestination)
if err != nil {
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
return false
}
- urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
fmt.Printf("The last argument should be a folder and end with \"/\"\n")
return false
}
- if filerUrl.Port() == "" {
- fmt.Printf("The filer port should be specified.\n")
- return false
- }
-
- filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
- if parseErr != nil {
- fmt.Printf("The filer port parse error: %v\n", parseErr)
- return false
- }
-
- filerGrpcPort := filerPort + 10000
- filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
+ masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerAddress)
if err != nil {
- fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
+ fmt.Printf("read from filer %s: %v\n", filerAddress, err)
return false
}
if strings.HasPrefix(urlPath, dirBuckets+"/") {
@@ -174,9 +157,8 @@ func runCopy(cmd *Command, args []string) bool {
go func() {
defer waitGroup.Done()
worker := FileCopyWorker{
- options: &copy,
- filerHost: filerUrl.Host,
- filerGrpcAddress: filerGrpcAddress,
+ options: &copy,
+ filerAddress: filerAddress,
}
if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
@@ -189,8 +171,8 @@ func runCopy(cmd *Command, args []string) bool {
return true
}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
+ err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@@ -228,9 +210,9 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
}
if mode.IsDir() {
- files, _ := ioutil.ReadDir(fileOrDir)
+ files, _ := os.ReadDir(fileOrDir)
for _, subFileOrDir := range files {
- cleanedDestDirectory := filepath.Clean(destPath + fi.Name())
+ cleanedDestDirectory := destPath + fi.Name()
if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), cleanedDestDirectory+"/", fileCopyTaskChan); err != nil {
return err
}
@@ -241,9 +223,8 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
}
type FileCopyWorker struct {
- options *CopyOptions
- filerHost string
- filerGrpcAddress string
+ options *CopyOptions
+ filerAddress pb.ServerAddress
}
func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
@@ -321,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi
return
}
- err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
@@ -356,14 +337,14 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 {
mimeType = detectMimeType(f)
- data, err := ioutil.ReadAll(f)
+ data, err := io.ReadAll(f)
if err != nil {
return err
}
- // assign a volume
- err = util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = util.Retry("upload", func() error {
+ // assign a volume
+ assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -381,50 +362,62 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if assignResult.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
}
- if assignResult.Url == "" {
+ if assignResult.Location.Url == "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult)
}
return nil
})
- })
- if err != nil {
- return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
- }
+ if assignErr != nil {
+ return assignErr
+ }
+
+ // upload data
+ targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: fileName,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: mimeType,
+ PairMap: nil,
+ Jwt: security.EncodedJwt(assignResult.Auth),
+ }
+ uploadResult, err := operation.UploadData(data, uploadOption)
+ if err != nil {
+ return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ }
+ if uploadResult.Error != "" {
+ return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ }
+ if *worker.options.verbose {
+ fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
+ }
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
+ chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
- uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
+ return nil
+ })
if err != nil {
- return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- }
- if uploadResult.Error != "" {
- return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- }
- if *worker.options.verbose {
- fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
+ return fmt.Errorf("upload %v: %v\n", fileName, err)
}
- chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
-
- fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
- Crtime: time.Now().Unix(),
- Mtime: time.Now().Unix(),
- Gid: task.gid,
- Uid: task.uid,
- FileSize: uint64(task.fileSize),
- FileMode: uint32(task.fileMode),
- Mime: mimeType,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
+ Mime: mimeType,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
@@ -435,7 +428,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
return nil
}); err != nil {
- return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err)
}
return nil
@@ -466,7 +459,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
@@ -487,10 +480,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
})
})
if err != nil {
- fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ uploadError = fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ return
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
if collection == "" {
collection = assignResult.Collection
}
@@ -498,7 +492,16 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
replication = assignResult.Replication
}
- uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: fileName + "-" + strconv.FormatInt(i+1, 10),
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: security.EncodedJwt(assignResult.Auth),
+ }
+ uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption)
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -525,8 +528,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(func() string {
- return copy.masters[0]
+ operation.DeleteFiles(func() pb.ServerAddress {
+ return pb.ServerAddress(copy.masters[0])
}, false, worker.options.grpcDialOption, fileIds)
return uploadError
}
@@ -536,22 +539,20 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("create manifest: %v", manifestErr)
}
- if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
Name: fileName,
Attributes: &filer_pb.FuseAttributes{
- Crtime: time.Now().Unix(),
- Mtime: time.Now().Unix(),
- Gid: task.gid,
- Uid: task.uid,
- FileSize: uint64(task.fileSize),
- FileMode: uint32(task.fileMode),
- Mime: mimeType,
- Replication: replication,
- Collection: collection,
- TtlSec: worker.options.ttlSec,
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ Gid: task.gid,
+ Uid: task.uid,
+ FileSize: uint64(task.fileSize),
+ FileMode: uint32(task.fileMode),
+ Mime: mimeType,
+ TtlSec: worker.options.ttlSec,
},
Chunks: manifestedChunks,
},
@@ -562,10 +563,10 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
}
return nil
}); err != nil {
- return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err)
}
- fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
return nil
}
@@ -594,7 +595,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
var fileId, host string
var auth security.EncodedJwt
- if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
@@ -616,7 +617,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
collection, replication = resp.Collection, resp.Replication
return nil
@@ -630,8 +631,16 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
return nil, collection, replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
}
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, flushErr, _ := operation.Upload(fileUrl, name, worker.options.cipher, reader, false, "", nil, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId),
+ Filename: name,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil {
return nil, collection, replication, fmt.Errorf("upload data: %v", flushErr)
}