diff options
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 315 |
1 files changed, 207 insertions, 108 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 3638bcb27..19aceb211 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -1,52 +1,56 @@ package command import ( + "context" "fmt" + "io" "io/ioutil" + "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" + "sync" + "time" - "context" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "io" - "net/http" - "strconv" - "time" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/spf13/viper" + "google.golang.org/grpc" ) var ( - copy CopyOptions + copy CopyOptions + waitGroup sync.WaitGroup ) type CopyOptions struct { - filerGrpcPort *int - master *string - include *string - replication *string - collection *string - ttl *string - maxMB *int - secretKey *string - - secret security.Secret + include *string + replication *string + collection *string + ttl *string + maxMB *int + masterClient *wdclient.MasterClient + concurrency *int + compressionLevel *int + grpcDialOption grpc.DialOption + masters []string } func init() { cmdCopy.Run = runCopy // break init cycle cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information") - copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location") copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir") copy.replication = cmdCopy.Flag.String("replication", "", "replication type") copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") - copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit") - copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000") - copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit") + copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines") + copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9") } var cmdCopy = &Command{ @@ -66,7 +70,9 @@ var cmdCopy = &Command{ } func runCopy(cmd *Command, args []string) bool { - copy.secret = security.Secret(*copy.secretKey) + + util.LoadConfiguration("security", false) + if len(args) <= 1 { return false } @@ -96,67 +102,170 @@ func runCopy(cmd *Command, args []string) bool { } filerGrpcPort := filerPort + 10000 - if *copy.filerGrpcPort != 0 { - filerGrpcPort = uint64(*copy.filerGrpcPort) + filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") + + ctx := context.Background() + + masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress) + if err != nil { + fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + return false } + if *copy.collection == "" { + *copy.collection = collection + } + if *copy.replication == "" { + *copy.replication = replication + } + if *copy.maxMB == 0 { + *copy.maxMB = int(maxMB) + } + copy.masters = masters - filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters) + go copy.masterClient.KeepConnectedToMaster() + copy.masterClient.WaitUntilConnected() - for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) { - return false + if *cmdCopy.IsDebug { + util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") + } + + fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency) + + go func() { + defer close(fileCopyTaskChan) + for _, fileOrDir := range fileOrDirs { + if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err) + break + } } + }() + for i := 0; i < *copy.concurrency; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + worker := FileCopyWorker{ + options: ©, + filerHost: filerUrl.Host, + filerGrpcAddress: filerGrpcAddress, + } + if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil { + fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) + return + } + }() } + waitGroup.Wait() + return true } -func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool { - f, err := os.Open(fileOrDir) - if err != nil { - fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) - return false - } - defer f.Close() +func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) { + err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + } + masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb + return nil + }) + return +} - fi, err := f.Stat() +func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error { + + fi, err := os.Stat(fileOrDir) if err != nil { - fmt.Printf("Failed to get stat for file %s: %v\n", fileOrDir, err) - return false + fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err) + return nil } mode := fi.Mode() if mode.IsDir() { files, _ := ioutil.ReadDir(fileOrDir) for _, subFileOrDir := range files { - if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") { - return false + if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil { + return err } } - return true + return nil } + uid, gid := util.GetFileUidGid(fi) + + fileCopyTaskChan <- FileCopyTask{ + sourceLocation: fileOrDir, + destinationUrlPath: destPath, + fileSize: fi.Size(), + fileMode: fi.Mode(), + uid: uid, + gid: gid, + } + + return nil +} + +type FileCopyWorker struct { + options *CopyOptions + filerHost string + filerGrpcAddress string +} + +func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error { + for task := range fileCopyTaskChan { + if err := worker.doEachCopy(ctx, task); err != nil { + return err + } + } + return nil +} + +type FileCopyTask struct { + sourceLocation string + destinationUrlPath string + fileSize int64 + fileMode os.FileMode + uid uint32 + gid uint32 +} + +func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error { + + f, err := os.Open(task.sourceLocation) + if err != nil { + fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err) + if _, ok := err.(*os.PathError); ok { + fmt.Printf("skipping %s\n", task.sourceLocation) + return nil + } + return err + } + defer f.Close() + // this is a regular file - if *copy.include != "" { - if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok { - return true + if *worker.options.include != "" { + if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok { + return nil } } // find the chunk count - chunkSize := int64(*copy.maxMB * 1024 * 1024) + chunkSize := int64(*worker.options.maxMB * 1024 * 1024) chunkCount := 1 - if chunkSize > 0 && fi.Size() > chunkSize { - chunkCount = int(fi.Size()/chunkSize) + 1 + if chunkSize > 0 && task.fileSize > chunkSize { + chunkCount = int(task.fileSize/chunkSize) + 1 } if chunkCount == 1 { - return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi) + return worker.uploadFileAsOne(ctx, task, f) } - return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize) + return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize) } -func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool { +func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error { // upload the file content fileName := filepath.Base(f.Name()) @@ -164,29 +273,27 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f var chunks []*filer_pb.FileChunk - if fi.Size() > 0 { + if task.fileSize > 0 { // assign a volume - assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid - uploadResult, err := operation.Upload(targetUrl, fileName, f, false, mimeType, nil, "") + uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) @@ -198,43 +305,42 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f ETag: uploadResult.ETag, }) - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) } - if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - return true + return nil } -func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { +func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) @@ -244,14 +350,14 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, for i := int64(0); i < int64(chunkCount); i++ { // assign a volume - assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{ + assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ Count: 1, - Replication: *copy.replication, - Collection: *copy.collection, - Ttl: *copy.ttl, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + Ttl: *worker.options.ttl, }) if err != nil { - fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err) + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid @@ -259,14 +365,12 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(f, chunkSize), - false, "application/octet-stream", nil, "") + false, "application/octet-stream", nil, assignResult.Auth) if err != nil { - fmt.Printf("upload data %v to %s: %v\n", fileName, targetUrl, err) - return false + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } if uploadResult.Error != "" { - fmt.Printf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) - return false + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) } chunks = append(chunks, &filer_pb.FileChunk{ FileId: assignResult.Fid, @@ -278,39 +382,38 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) } - if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: urlFolder, + Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ Name: fileName, Attributes: &filer_pb.FuseAttributes{ Crtime: time.Now().Unix(), Mtime: time.Now().Unix(), - Gid: uint32(os.Getgid()), - Uid: uint32(os.Getuid()), - FileSize: uint64(fi.Size()), - FileMode: uint32(fi.Mode()), + Gid: task.gid, + Uid: task.uid, + FileSize: uint64(task.fileSize), + FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *copy.replication, - Collection: *copy.collection, - TtlSec: int32(util.ParseInt(*copy.ttl, 0)), + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) - return false + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName) - return true + return nil } func detectMimeType(f *os.File) string { @@ -329,15 +432,11 @@ func detectMimeType(f *os.File) string { return mimeType } -func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error { - - grpcConnection, err := util.GrpcDial(filerAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", filerAddress, err) - } - defer grpcConnection.Close() +func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(clientConn) + return fn(client) + }, filerAddress, grpcDialOption) - return fn(client) } |
