diff options
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 449 |
1 files changed, 305 insertions, 144 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 3638bcb27..2d6ba94d6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -1,52 +1,62 @@ package command import ( + "context" "fmt" + "io" "io/ioutil" + "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/util/grace" - "context" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" - "io" - "net/http" - "strconv" - "time" + "github.com/chrislusf/seaweedfs/weed/wdclient" ) var ( - copy CopyOptions + copy CopyOptions + waitGroup sync.WaitGroup ) type CopyOptions struct { - filerGrpcPort *int - master *string - include *string - replication *string - collection *string - ttl *string - maxMB *int - secretKey *string - - secret security.Secret + include *string + replication *string + collection *string + ttl *string + maxMB *int + masterClient *wdclient.MasterClient + concurrenctFiles *int + concurrenctChunks *int + grpcDialOption grpc.DialOption + masters []string + cipher bool + ttlSec int32 } func init() { cmdCopy.Run = runCopy // break init cycle cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information") - copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location") copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") copy.replication = cmdCopy.Flag.String("replication", "", "replication type") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit") - copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000") - copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") + copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file") } var cmdCopy = &Command{ @@ -66,7 +76,9 @@ var cmdCopy = &Command{ } func runCopy(cmd *Command, args []string) bool { - copy.secret = security.Secret(*copy.secretKey) + + util.LoadConfiguration("security", false) + if len(args) <= 1 { return false } @@ -96,221 +108,380 @@ func runCopy(cmd *Command, args []string) bool { } filerGrpcPort := filerPort + 10000 - if *copy.filerGrpcPort != 0 { - filerGrpcPort = uint64(*copy.filerGrpcPort) + filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + + masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress) + if err != nil { + fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + return false + } + if *copy.collection == "" { + *copy.collection = collection + } + if *copy.replication == "" { + *copy.replication = replication } + if *copy.maxMB == 0 { + *copy.maxMB = int(maxMB) + } + copy.masters = masters + copy.cipher = cipher - filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + ttl, err := needle.ReadTTL(*copy.ttl) + if err != nil { + fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err) + return false + } + copy.ttlSec = int32(ttl.Minutes()) * 60 + + if *cmdCopy.IsDebug { + grace.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") + } + + fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles) - for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) { - return false + go func() { + defer close(fileCopyTaskChan) + for _, fileOrDir := range fileOrDirs { + if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err) + break + } } + }() + for i := 0; i < *copy.concurrenctFiles; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + worker := FileCopyWorker{ + options: ©, + filerHost: filerUrl.Host, + filerGrpcAddress: filerGrpcAddress, + } + if err := worker.copyFiles(fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) + return + } + }() } + waitGroup.Wait() + return true } -func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool { - f, err := os.Open(fileOrDir) - if err != nil { - fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) - return false - } - defer f.Close() +func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) { + err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb + cipher = resp.Cipher + return nil + }) + return +} + +func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error { - fi, err := f.Stat() + fi, err := os.Stat(fileOrDir) if err != nil { - fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err) - return false + fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err) + return nil } mode := fi.Mode() if mode.IsDir() { files, _ := ioutil.ReadDir(fileOrDir) for _, subFileOrDir := range files { - if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") { - return false + if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil { + return err } } - return true + return nil } + uid, gid := util.GetFileUidGid(fi) + + fileCopyTaskChan <- FileCopyTask{ + sourceLocation: fileOrDir, + destinationUrlPath: destPath, + fileSize: fi.Size(), + fileMode: fi.Mode(), + uid: uid, + gid: gid, + } + + return nil +} + +type FileCopyWorker struct { + options *CopyOptions + filerHost string + filerGrpcAddress string +} + +func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { + for task := range fileCopyTaskChan { + if err := worker.doEachCopy(task); err != nil { + return err + } + } + return nil +} + +type FileCopyTask struct { + sourceLocation string + destinationUrlPath string + fileSize int64 + fileMode os.FileMode + uid uint32 + gid uint32 +} + +func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error { + + f, err := os.Open(task.sourceLocation) + if err != nil { + fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err) + if _, ok := err.(*os.PathError); ok { + fmt.Printf("skipping %s\n", task.sourceLocation) + return nil + } + return err + } + defer f.Close() + // this is a regular file - if *copy.include != "" { - if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok { - return true + if *worker.options.include != "" { + if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok { + return nil } } // find the chunk count - chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkSize := int64(*worker.options.maxMB * 1024 * 1024) chunkCount := 1 - if chunkSize > 0 && fi.Size() > chunkSize { - chunkCount = int(fi.Size()/chunkSize) + 1 + if chunkSize > 0 && task.fileSize > chunkSize { + chunkCount = int(task.fileSize/chunkSize) + 1 } if chunkCount == 1 { - return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi) + return worker.uploadFileAsOne(task, f) } - return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize) + return worker.uploadFileInChunks(task, f, chunkCount, chunkSize) } -func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool { +func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error { // upload the file content fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) + data, err := ioutil.ReadAll(f) + if err != nil { + return err + } var chunks []*filer_pb.FileChunk + var assignResult *filer_pb.AssignVolumeResponse + var assignError error - if fi.Size() > 0 { + if task.fileSize > 0 { // assign a volume - assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + ParentPath: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(context.Background(), request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + if assignResult.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) + } + return nil }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId - uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, "") + uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth)) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: 0, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - return true + return nil } -func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { +func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) - var chunks []*filer_pb.FileChunk + chunksChan := make(chan *filer_pb.FileChunk, chunkCount) + + concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks) + var wg sync.WaitGroup + var uploadError error + var collection, replication string + + fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount) + for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ { + wg.Add(1) + concurrentChunks <- struct{}{} + go func(i int64) { + defer func() { + wg.Done() + <-concurrentChunks + }() + // assign a volume + var assignResult *filer_pb.AssignVolumeResponse + var assignError error + err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + ParentPath: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(context.Background(), request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + if assignResult.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) + } + return nil + }) + if err != nil { + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + } + if err != nil { + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + } - for i := int64(0); i < int64(chunkCount); i++ { + targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + if collection == "" { + collection = assignResult.Collection + } + if replication == "" { + replication = assignResult.Replication + } - // assign a volume - assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, - }) - if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) - } + uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth)) + if err != nil { + uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + return + } + if uploadResult.Error != "" { + uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return + } + chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize) - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + }(i) + } + wg.Wait() + close(chunksChan) - uploadResult, err := operation.Upload(targetUrl, - fileName+"-"+strconv.FormatInt(i+1, 10), - io.LimitReader(f, chunkSize), - false, "application/octet-stream", nil, "") - if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false - } - if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + var chunks []*filer_pb.FileChunk + for chunk := range chunksChan { + chunks = append(chunks, chunk) + } + + if uploadError != nil { + var fileIds []string + for _, chunk := range chunks { + fileIds = append(fileIds, chunk.FileId) } - chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, - Offset: i * chunkSize, - Size: uint64(uploadResult.Size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }) - fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) + operation.DeleteFiles(copy.masters[0], false, worker.options.grpcDialOption, fileIds) + return uploadError } - if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: replication, + Collection: collection, + TtlSec: worker.options.ttlSec, }, Chunks: chunks, }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) - return true + return nil } func detectMimeType(f *os.File) string { @@ -322,22 +493,12 @@ func detectMimeType(f *os.File) string { } if err != nil { fmt.Printf("read head of %v: %v\n", f.Name(), err) - return "application/octet-stream" + return "" } f.Seek(0, io.SeekStart) mimeType := http.DetectContentType(head[:n]) - return mimeType -} - -func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error { - - grpcConnection, err := util.GrpcDial(filerAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", filerAddress, err) + if mimeType == "application/octet-stream" { + return "" } - defer grpcConnection.Close() - - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - - return fn(client) + return mimeType } |
