diff options
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 189 |
1 files changed, 99 insertions, 90 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 818ae5f23..f20ae99bf 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -3,11 +3,8 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "io" - "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "strconv" @@ -17,14 +14,14 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/util/grace" - + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/wdclient" ) @@ -74,7 +71,7 @@ var cmdFilerCopy = &Command{ It can copy one or a list of files or folders. If copying a whole folder recursively: - All files under the folder and subfolders will be copyed. + All files under the folder and sub folders will be copied. Optional parameter "-include" allows you to specify the file name patterns. If "maxMB" is set to a positive number, files larger than it would be split into chunks. @@ -92,35 +89,21 @@ func runCopy(cmd *Command, args []string) bool { filerDestination := args[len(args)-1] fileOrDirs := args[0 : len(args)-1] - filerUrl, err := url.Parse(filerDestination) + filerAddress, urlPath, err := pb.ParseUrl(filerDestination) if err != nil { fmt.Printf("The last argument should be a URL on filer: %v\n", err) return false } - urlPath := filerUrl.Path if !strings.HasSuffix(urlPath, "/") { fmt.Printf("The last argument should be a folder and end with \"/\"\n") return false } - if filerUrl.Port() == "" { - fmt.Printf("The filer port should be specified.\n") - return false - } - - filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64) - if parseErr != nil { - fmt.Printf("The filer port parse error: %v\n", parseErr) - return false - } - - filerGrpcPort := filerPort + 10000 - filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress) + masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerAddress) if err != nil { - fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + fmt.Printf("read from filer %s: %v\n", filerAddress, err) return false } if strings.HasPrefix(urlPath, dirBuckets+"/") { @@ -174,9 +157,8 @@ func runCopy(cmd *Command, args []string) bool { go func() { defer waitGroup.Done() worker := FileCopyWorker{ - options: ©, - filerHost: filerUrl.Host, - filerGrpcAddress: filerGrpcAddress, + options: ©, + filerAddress: filerAddress, } if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) @@ -189,8 +171,8 @@ func runCopy(cmd *Command, args []string) bool { return true } -func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { + err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) @@ -228,9 +210,9 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } if mode.IsDir() { - files, _ := ioutil.ReadDir(fileOrDir) + files, _ := os.ReadDir(fileOrDir) for _, subFileOrDir := range files { - cleanedDestDirectory := filepath.Clean(destPath + fi.Name()) + cleanedDestDirectory := destPath + fi.Name() if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), cleanedDestDirectory+"/", fileCopyTaskChan); err != nil { return err } @@ -241,9 +223,8 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } type FileCopyWorker struct { - options *CopyOptions - filerHost string - filerGrpcAddress string + options *CopyOptions + filerAddress pb.ServerAddress } func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { @@ -321,7 +302,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -356,14 +337,14 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 { mimeType = detectMimeType(f) - data, err := ioutil.ReadAll(f) + data, err := io.ReadAll(f) if err != nil { return err } - // assign a volume - err = util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = util.Retry("upload", func() error { + // assign a volume + assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -381,50 +362,62 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if assignResult.Error != "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) } - if assignResult.Url == "" { + if assignResult.Location.Url == "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult) } return nil }) - }) - if err != nil { - return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) - } + if assignErr != nil { + return assignErr + } + + // upload data + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: fileName, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: mimeType, + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploadResult, err := operation.UploadData(data, uploadOption) + if err != nil { + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + } + if *worker.options.verbose { + fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth)) + return nil + }) if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - } - if *worker.options.verbose { - fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + return fmt.Errorf("upload %v: %v\n", fileName, err) } - chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ - Crtime: time.Now().Unix(), - Mtime: time.Now().Unix(), - Gid: task.gid, - Uid: task.uid, - FileSize: uint64(task.fileSize), - FileMode: uint32(task.fileMode), - Mime: mimeType, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), + Mime: mimeType, + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, @@ -435,7 +428,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } return nil @@ -466,7 +459,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -487,10 +480,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, }) }) if err != nil { - fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + uploadError = fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) + return } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId if collection == "" { collection = assignResult.Collection } @@ -498,7 +492,16 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, replication = assignResult.Replication } - uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: fileName + "-" + strconv.FormatInt(i+1, 10), + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption) if err != nil { uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) return @@ -525,8 +528,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(func() string { - return copy.masters[0] + operation.DeleteFiles(func() pb.ServerAddress { + return pb.ServerAddress(copy.masters[0]) }, false, worker.options.grpcDialOption, fileIds) return uploadError } @@ -536,22 +539,20 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ - Crtime: time.Now().Unix(), - Mtime: time.Now().Unix(), - Gid: task.gid, - Uid: task.uid, - FileSize: uint64(task.fileSize), - FileMode: uint32(task.fileMode), - Mime: mimeType, - Replication: replication, - Collection: collection, - TtlSec: worker.options.ttlSec, + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), + Mime: mimeType, + TtlSec: worker.options.ttlSec, }, Chunks: manifestedChunks, }, @@ -562,10 +563,10 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) return nil } @@ -594,7 +595,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -616,7 +617,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) collection, replication = resp.Collection, resp.Replication return nil @@ -630,8 +631,16 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off return nil, collection, replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr) } - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, flushErr, _ := operation.Upload(fileUrl, name, worker.options.cipher, reader, false, "", nil, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId), + Filename: name, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, flushErr, _ := operation.Upload(reader, uploadOption) if flushErr != nil { return nil, collection, replication, fmt.Errorf("upload data: %v", flushErr) } |
