diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_copy.go | 26 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 3 |
2 files changed, 15 insertions, 14 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 650757442..a852ca773 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -106,14 +106,14 @@ func runCopy(cmd *Command, args []string) bool { copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client") for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) { + if !doEachCopy(context.Background(), fileOrDir, filerUrl.Host, filerGrpcAddress, copy.grpcDialOption, urlPath) { return false } } return true } -func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool { +func doEachCopy(ctx context.Context, fileOrDir string, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, path string) bool { f, err := os.Open(fileOrDir) if err != nil { fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) @@ -131,7 +131,7 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDia if mode.IsDir() { files, _ := ioutil.ReadDir(fileOrDir) for _, subFileOrDir := range files { - if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") { + if !doEachCopy(ctx, fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, grpcDialOption, path+fi.Name()+"/") { return false } } @@ -153,13 +153,13 @@ func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, grpcDia } if chunkCount == 1 { - return uploadFileAsOne(filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi) + return uploadFileAsOne(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi) } - return uploadFileInChunks(filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize) + return uploadFileInChunks(ctx, filerAddress, filerGrpcAddress, grpcDialOption, path, f, fi, chunkCount, chunkSize) } -func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool { +func uploadFileAsOne(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo) bool { // upload the file content fileName := filepath.Base(f.Name()) @@ -204,7 +204,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc. fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) } - if err := withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: urlFolder, Entry: &filer_pb.Entry{ @@ -225,7 +225,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc. }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil @@ -237,7 +237,7 @@ func uploadFileAsOne(filerAddress, filerGrpcAddress string, grpcDialOption grpc. return true } -func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { +func uploadFileInChunks(ctx context.Context, filerAddress, filerGrpcAddress string, grpcDialOption grpc.DialOption, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) @@ -281,7 +281,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption gr fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) } - if err := withFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: urlFolder, Entry: &filer_pb.Entry{ @@ -302,7 +302,7 @@ func uploadFileInChunks(filerAddress, filerGrpcAddress string, grpcDialOption gr }, } - if _, err := client.CreateEntry(context.Background(), request); err != nil { + if _, err := client.CreateEntry(ctx, request); err != nil { return fmt.Errorf("update fh: %v", err) } return nil @@ -332,9 +332,9 @@ func detectMimeType(f *os.File) string { return mimeType } -func withFilerClient(filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { +func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(filerAddress, grpcDialOption) + grpcConnection, err := util.GrpcDial(ctx, filerAddress, grpcDialOption) if err != nil { return fmt.Errorf("fail to dial %s: %v", filerAddress, err) } diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index c9afbdc8a..82576afe6 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -1,6 +1,7 @@ package command import ( + "context" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -116,7 +117,7 @@ func runFilerReplicate(cmd *Command, args []string) bool { } else { glog.V(1).Infof("modify: %s", key) } - if err = replicator.Replicate(key, m); err != nil { + if err = replicator.Replicate(context.Background(), key, m); err != nil { glog.Errorf("replicate %s: %+v", key, err) } else { glog.V(1).Infof("replicated %s", key) |
