aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-05-12 21:45:39 -0700
committerChris Lu <chris.lu@gmail.com>2021-05-12 21:45:39 -0700
commitb430d1b6ee7ae245a1c1ad4116f5de7f52062104 (patch)
treebf52e03e37e9aa984c07fe8243bd28250b620303 /weed/command/filer_copy.go
parent534c4202a56874f405c51f1cff2deae810830702 (diff)
downloadseaweedfs-b430d1b6ee7ae245a1c1ad4116f5de7f52062104.tar.xz
seaweedfs-b430d1b6ee7ae245a1c1ad4116f5de7f52062104.zip
filer.copy: "check.size" before copying files
fix https://github.com/chrislusf/seaweedfs/issues/2067
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go62
1 files changed, 57 insertions, 5 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index dc729ed33..a5d29c451 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -3,6 +3,7 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"io"
"io/ioutil"
"net/http"
@@ -46,6 +47,8 @@ type CopyOptions struct {
masters []string
cipher bool
ttlSec int32
+ checkSize *bool
+ verbose *bool
}
func init() {
@@ -59,6 +62,8 @@ func init() {
copy.maxMB = cmdCopy.Flag.Int("maxMB", 4, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
+ copy.checkSize = cmdCopy.Flag.Bool("check.size", false, "copy when the target file size is different from the source file")
+ copy.verbose = cmdCopy.Flag.Bool("verbose", false, "print out details during copying")
}
var cmdCopy = &Command{
@@ -220,9 +225,9 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
if mode.IsDir() {
files, _ := ioutil.ReadDir(fileOrDir)
- println("checking directory", fileOrDir)
for _, subFileOrDir := range files {
- if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
+ cleanedDestDirectory := filepath.Clean(destPath + fi.Name())
+ if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), cleanedDestDirectory+"/", fileCopyTaskChan); err != nil {
return err
}
}
@@ -275,6 +280,15 @@ func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
}
}
+ if shouldCopy, err := worker.checkExistingFileFirst(task, f); err != nil {
+ return fmt.Errorf("check existing file: %v", err)
+ } else if !shouldCopy {
+ if *worker.options.verbose {
+ fmt.Printf("skipping copied file: %v\n", f.Name())
+ }
+ return nil
+ }
+
// find the chunk count
chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
chunkCount := 1
@@ -289,6 +303,42 @@ func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
+func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.File) (shouldCopy bool, err error) {
+
+ shouldCopy = true
+
+ if !*worker.options.checkSize {
+ return
+ }
+
+ fileStat, err := f.Stat()
+ if err != nil {
+ shouldCopy = false
+ return
+ }
+
+ err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Directory: task.destinationUrlPath,
+ Name: filepath.Base(f.Name()),
+ }
+
+ resp, lookupErr := client.LookupDirectoryEntry(context.Background(), request)
+ if lookupErr != nil {
+ // mostly not found error
+ return nil
+ }
+
+ if fileStat.Size() == int64(filer.FileSize(resp.Entry)) {
+ shouldCopy = false
+ }
+
+ return nil
+ })
+ return
+}
+
func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
@@ -343,11 +393,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if uploadResult.Error != "" {
return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
}
- fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
+ if *worker.options.verbose {
+ fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
+ }
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
}
if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
@@ -501,7 +553,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
}
- fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
return nil
}