aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/filer_copy.go59
1 files changed, 58 insertions, 1 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 722f64679..818ae5f23 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -531,6 +531,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return uploadError
}
+ manifestedChunks, manifestErr := filer.MaybeManifestize(worker.saveDataAsChunk, chunks)
+ if manifestErr != nil {
+ return fmt.Errorf("create manifest: %v", manifestErr)
+ }
+
if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
@@ -548,7 +553,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Collection: collection,
TtlSec: worker.options.ttlSec,
},
- Chunks: chunks,
+ Chunks: manifestedChunks,
},
}
@@ -583,3 +588,55 @@ func detectMimeType(f *os.File) string {
}
return mimeType
}
+
+func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx := context.Background()
+
+ assignErr := util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: name,
+ }
+
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, err)
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ })
+ if assignErr != nil {
+ return assignErr
+ }
+
+ return nil
+ }); flushErr != nil {
+ return nil, collection, replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ uploadResult, flushErr, _ := operation.Upload(fileUrl, name, worker.options.cipher, reader, false, "", nil, auth)
+ if flushErr != nil {
+ return nil, collection, replication, fmt.Errorf("upload data: %v", flushErr)
+ }
+ if uploadResult.Error != "" {
+ return nil, collection, replication, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
+}