diff options
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 75 |
1 files changed, 52 insertions, 23 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index e5979d786..a359bf32b 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -125,10 +125,6 @@ func runCopy(cmd *Command, args []string) bool { } copy.masters = masters - copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters) - go copy.masterClient.KeepConnectedToMaster() - copy.masterClient.WaitUntilConnected() - if *cmdCopy.IsDebug { util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof") } @@ -274,23 +270,35 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy mimeType := detectMimeType(f) var chunks []*filer_pb.FileChunk + var assignResult *filer_pb.AssignVolumeResponse + var assignError error if task.fileSize > 0 { // assign a volume - assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - Ttl: *worker.options.ttl, + err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + ParentPath: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(ctx, request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + return nil }) if err != nil { fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId - uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel) + uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, security.EncodedJwt(assignResult.Auth), *worker.options.compressionLevel) if err != nil { return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) } @@ -300,7 +308,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) chunks = append(chunks, &filer_pb.FileChunk{ - FileId: assignResult.Fid, + FileId: assignResult.FileId, Offset: 0, Size: uint64(uploadResult.Size), Mtime: time.Now().UnixNano(), @@ -352,6 +360,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks) var wg sync.WaitGroup var uploadError error + var collection, replication string fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount) for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ { @@ -363,22 +372,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC <-concurrentChunks }() // assign a volume - assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - Ttl: *worker.options.ttl, + var assignResult *filer_pb.AssignVolumeResponse + var assignError error + err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), + ParentPath: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(ctx, request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + return nil }) if err != nil { fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } + if err != nil { + fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) + } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + if collection == "" { + collection = assignResult.Collection + } + if replication == "" { + replication = assignResult.Replication + } uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), io.NewSectionReader(f, i*chunkSize, chunkSize), - false, "", nil, assignResult.Auth) + false, "", nil, security.EncodedJwt(assignResult.Auth)) if err != nil { uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) return @@ -388,7 +417,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC return } chunksChan <- &filer_pb.FileChunk{ - FileId: assignResult.Fid, + FileId: assignResult.FileId, Offset: i * chunkSize, Size: uint64(uploadResult.Size), Mtime: time.Now().UnixNano(), @@ -410,7 +439,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds) + operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds) return uploadError } @@ -427,8 +456,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC FileSize: uint64(task.fileSize), FileMode: uint32(task.fileMode), Mime: mimeType, - Replication: *worker.options.replication, - Collection: *worker.options.collection, + Replication: replication, + Collection: collection, TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)), }, Chunks: chunks, |
