aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_remote.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server_remote.go')
-rw-r--r--weed/server/filer_grpc_server_remote.go112
1 files changed, 60 insertions, 52 deletions
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 8144d6a90..54e9445ce 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -78,66 +78,74 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
var chunks []*filer_pb.FileChunk
+ var fetchAndWriteErr error
- // FIXME limit on parallel
+ limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
- size := chunkSize
- if offset+chunkSize > entry.Remote.RemoteSize {
- size = entry.Remote.RemoteSize - offset
- }
+ localOffset := offset
- // assign one volume server
- assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
- if err != nil {
- return resp, err
- }
- if assignResult.Error != "" {
- return resp, fmt.Errorf("assign: %v", assignResult.Error)
- }
- fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
- if assignResult.Error != "" {
- return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
- }
+ limitedConcurrentExecutor.Execute(func() {
+ size := chunkSize
+ if localOffset+chunkSize > entry.Remote.RemoteSize {
+ size = entry.Remote.RemoteSize - localOffset
+ }
- // tell filer to tell volume server to download into needles
- err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
- VolumeId: uint32(fileId.VolumeId),
- NeedleId: uint64(fileId.Key),
- Cookie: uint32(fileId.Cookie),
- Offset: offset,
- Size: size,
- RemoteType: storageConf.Type,
- RemoteName: storageConf.Name,
- S3AccessKey: storageConf.S3AccessKey,
- S3SecretKey: storageConf.S3SecretKey,
- S3Region: storageConf.S3Region,
- S3Endpoint: storageConf.S3Endpoint,
- RemoteBucket: remoteStorageMountedLocation.Bucket,
- RemotePath: string(dest),
- })
- if fetchAndWriteErr != nil {
- return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
+ // assign one volume server
+ assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
+ if err != nil {
+ fetchAndWriteErr = err
+ return
+ }
+ if assignResult.Error != "" {
+ fetchAndWriteErr = fmt.Errorf("assign: %v", assignResult.Error)
+ return
+ }
+ fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
+ if assignResult.Error != "" {
+ fetchAndWriteErr = fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
+ return
}
- return nil
- })
- if err != nil {
- return nil, err
- }
+ // tell filer to tell volume server to download into needles
+ err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
+ VolumeId: uint32(fileId.VolumeId),
+ NeedleId: uint64(fileId.Key),
+ Cookie: uint32(fileId.Cookie),
+ Offset: localOffset,
+ Size: size,
+ RemoteType: storageConf.Type,
+ RemoteName: storageConf.Name,
+ S3AccessKey: storageConf.S3AccessKey,
+ S3SecretKey: storageConf.S3SecretKey,
+ S3Region: storageConf.S3Region,
+ S3Endpoint: storageConf.S3Endpoint,
+ RemoteBucket: remoteStorageMountedLocation.Bucket,
+ RemotePath: string(dest),
+ })
+ if fetchAndWriteErr != nil {
+ return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
+ }
+ return nil
+ })
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: offset,
- Size: uint64(size),
- Mtime: time.Now().Unix(),
- Fid: &filer_pb.FileId{
- VolumeId: uint32(fileId.VolumeId),
- FileKey: uint64(fileId.Key),
- Cookie: uint32(fileId.Cookie),
- },
- })
+ if err != nil {
+ fetchAndWriteErr = err
+ return
+ }
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: localOffset,
+ Size: uint64(size),
+ Mtime: time.Now().Unix(),
+ Fid: &filer_pb.FileId{
+ VolumeId: uint32(fileId.VolumeId),
+ FileKey: uint64(fileId.Key),
+ Cookie: uint32(fileId.Cookie),
+ },
+ })
+ })
}
garbage := entry.Chunks