aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-20 19:18:12 -0700
committerchrislu <chris.lu@gmail.com>2022-08-20 19:18:12 -0700
commit3bf8e772f8a80b6b36c095dd957176928aaf1ab4 (patch)
tree1b351307005c076a036dce39e6937bef8711fbc7
parentf8fa430257f6c5658fe20aca8cc0fac8b127079c (diff)
downloadseaweedfs-3bf8e772f8a80b6b36c095dd957176928aaf1ab4.tar.xz
seaweedfs-3bf8e772f8a80b6b36c095dd957176928aaf1ab4.zip
webdav: retryable data chunk upload
-rw-r--r--weed/server/webdav_server.go73
1 files changed, 23 insertions, 50 deletions
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 0dd025e27..3233f0b39 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -376,61 +376,34 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
- var fileId, host string
- var auth security.EncodedJwt
-
- if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx := context.Background()
-
- assignErr := util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: f.fs.option.Replication,
- Collection: f.fs.option.Collection,
- DiskType: f.fs.option.DiskType,
- Path: name,
- }
-
- resp, err := client.AssignVolume(ctx, request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
-
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- return nil
- }); flushErr != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
- }
+ fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ f.fs,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: f.fs.option.Replication,
+ Collection: f.fs.option.Collection,
+ DiskType: f.fs.option.DiskType,
+ Path: name,
+ },
+ &operation.UploadOption{
+ Filename: f.name,
+ Cipher: f.fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ },
+ func(host, fileId string) string {
+ return fmt.Sprintf("http://%s/%s", host, fileId)
+ },
+ reader,
+ )
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadOption := &operation.UploadOption{
- UploadUrl: fileUrl,
- Filename: f.name,
- Cipher: f.fs.option.Cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: auth,
- }
- uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil {
- glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
+ glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
return nil, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
+ glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
return uploadResult.ToPbFileChunk(fileId, offset), nil