diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-12 22:47:52 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-12 22:47:52 -0700 |
| commit | e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6 (patch) | |
| tree | 3ad0436940263a24ac46d38a60dd1e35b2c1cdfe /weed/command/filer_copy.go | |
| parent | 2c9d4c8f43c1e95c75fc332ca83d19e33e5da3ac (diff) | |
| download | seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.tar.xz seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.zip | |
change server address from string to a type
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 67 |
1 files changed, 25 insertions, 42 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 05aa96292..0feae63b3 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "strconv" @@ -92,35 +91,21 @@ func runCopy(cmd *Command, args []string) bool { filerDestination := args[len(args)-1] fileOrDirs := args[0 : len(args)-1] - filerUrl, err := url.Parse(filerDestination) + filerAddress, urlPath, err := pb.ParseUrl(filerDestination) if err != nil { fmt.Printf("The last argument should be a URL on filer: %v\n", err) return false } - urlPath := filerUrl.Path if !strings.HasSuffix(urlPath, "/") { fmt.Printf("The last argument should be a folder and end with \"/\"\n") return false } - if filerUrl.Port() == "" { - fmt.Printf("The filer port should be specified.\n") - return false - } - - filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64) - if parseErr != nil { - fmt.Printf("The filer port parse error: %v\n", parseErr) - return false - } - - filerGrpcPort := filerPort + 10000 - filerGrpcAddress := util.JoinHostPort(filerUrl.Hostname(), int(filerGrpcPort)) copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress) + masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerAddress) if err != nil { - fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + fmt.Printf("read from filer %s: %v\n", filerAddress, err) return false } if strings.HasPrefix(urlPath, dirBuckets+"/") { @@ -174,9 +159,8 @@ func runCopy(cmd *Command, args []string) bool { go func() { defer waitGroup.Done() worker := FileCopyWorker{ - options: ©, - filerHost: filerUrl.Host, - filerGrpcAddress: filerGrpcAddress, + options: ©, + filerAddress: filerAddress, } if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) @@ -189,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool { return true } -func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { +func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { @@ -241,9 +225,8 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } type FileCopyWorker struct { - options *CopyOptions - filerHost string - filerGrpcAddress string + options *CopyOptions + filerAddress pb.ServerAddress } func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { @@ -321,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -363,7 +346,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err // assign a volume err = util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -381,7 +364,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if assignResult.Error != "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) } - if assignResult.Url == "" { + if assignResult.Location.Url == "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult) } return nil @@ -391,7 +374,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId uploadOption := &operation.UploadOption{ UploadUrl: targetUrl, Filename: fileName, @@ -414,10 +397,10 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -443,7 +426,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } return nil @@ -474,7 +457,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -498,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId if collection == "" { collection = assignResult.Collection } @@ -508,7 +491,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, uploadOption := &operation.UploadOption{ UploadUrl: targetUrl, - Filename: fileName+"-"+strconv.FormatInt(i+1, 10), + Filename: fileName + "-" + strconv.FormatInt(i+1, 10), Cipher: worker.options.cipher, IsInputCompressed: false, MimeType: "", @@ -542,8 +525,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(func() string { - return copy.masters[0] + operation.DeleteFiles(func() pb.ServerAddress { + return pb.ServerAddress(copy.masters[0]) }, false, worker.options.grpcDialOption, fileIds) return uploadError } @@ -553,7 +536,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -579,10 +562,10 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) return nil } @@ -611,7 +594,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -633,7 +616,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) collection, replication = resp.Collection, resp.Replication return nil |
