aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go75
1 files changed, 52 insertions, 23 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index e5979d786..a359bf32b 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -125,10 +125,6 @@ func runCopy(cmd *Command, args []string) bool {
}
copy.masters = masters
- copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
- go copy.masterClient.KeepConnectedToMaster()
- copy.masterClient.WaitUntilConnected()
-
if *cmdCopy.IsDebug {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
@@ -274,23 +270,35 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
mimeType := detectMimeType(f)
var chunks []*filer_pb.FileChunk
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(ctx, request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
- uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
+ uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, security.EncodedJwt(assignResult.Auth), *worker.options.compressionLevel)
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
@@ -300,7 +308,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
+ FileId: assignResult.FileId,
Offset: 0,
Size: uint64(uploadResult.Size),
Mtime: time.Now().UnixNano(),
@@ -352,6 +360,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
var wg sync.WaitGroup
var uploadError error
+ var collection, replication string
fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
@@ -363,22 +372,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
<-concurrentChunks
}()
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
+ err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(ctx, request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ if collection == "" {
+ collection = assignResult.Collection
+ }
+ if replication == "" {
+ replication = assignResult.Replication
+ }
uploadResult, err := operation.Upload(targetUrl,
fileName+"-"+strconv.FormatInt(i+1, 10),
io.NewSectionReader(f, i*chunkSize, chunkSize),
- false, "", nil, assignResult.Auth)
+ false, "", nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -388,7 +417,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
return
}
chunksChan <- &filer_pb.FileChunk{
- FileId: assignResult.Fid,
+ FileId: assignResult.FileId,
Offset: i * chunkSize,
Size: uint64(uploadResult.Size),
Mtime: time.Now().UnixNano(),
@@ -410,7 +439,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
+ operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
return uploadError
}
@@ -427,8 +456,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
+ Replication: replication,
+ Collection: collection,
TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
},
Chunks: chunks,