aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-20 18:50:57 -0700
committerchrislu <chris.lu@gmail.com>2022-08-20 18:50:57 -0700
commitd49d0a9fc29db0ae3fd667d3f988561ca8e3b635 (patch)
treeb4ee553998e95b2b9600411e7e6ecbacc8313dce /weed/command/filer_copy.go
parent689b4ecdcc9dc6edeaf557a6988f520e1fe608ac (diff)
downloadseaweedfs-d49d0a9fc29db0ae3fd667d3f988561ca8e3b635.tar.xz
seaweedfs-d49d0a9fc29db0ae3fd667d3f988561ca8e3b635.zip
filer.copy: retryable upload
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go94
1 files changed, 46 insertions, 48 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 7bfd484f0..e20087ce4 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -585,59 +585,57 @@ func detectMimeType(f *os.File) string {
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
- var fileId, host string
- var auth security.EncodedJwt
+ var finalFileId string
+ uploadResult, flushErr, _ := operation.UploadWithRetry(
+ worker,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: name,
+ },
+ &operation.UploadOption{
+ Filename: name,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ },
+ func(host, fileId string) string {
+ finalFileId = fileId
+ return fmt.Sprintf("http://%s/%s", host, fileId)
+ },
+ reader,
+ )
- if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx := context.Background()
-
- assignErr := util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: name,
- }
-
- resp, err := client.AssignVolume(ctx, request)
- if err != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, err)
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
-
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- return nil
- }); flushErr != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
- }
-
- uploadOption := &operation.UploadOption{
- UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId),
- Filename: name,
- Cipher: worker.options.cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: auth,
- }
- uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil {
return nil, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return uploadResult.ToPbFileChunk(finalFileId, offset), nil
+}
+
+var _ = filer_pb.FilerClient(&FileCopyWorker{})
+
+func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+
+ filerGrpcAddress := worker.filerAddress.ToGrpcAddress()
+ err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, worker.options.grpcDialOption)
+
+ return
+}
+
+func (worker *FileCopyWorker) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
+func (worker *FileCopyWorker) GetDataCenter() string {
+ return ""
}