aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/filer_copy.go115
1 files changed, 71 insertions, 44 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 19aceb211..1dd831cb6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -29,16 +29,17 @@ var (
)
type CopyOptions struct {
- include *string
- replication *string
- collection *string
- ttl *string
- maxMB *int
- masterClient *wdclient.MasterClient
- concurrency *int
- compressionLevel *int
- grpcDialOption grpc.DialOption
- masters []string
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ masterClient *wdclient.MasterClient
+ concurrenctFiles *int
+ concurrenctChunks *int
+ compressionLevel *int
+ grpcDialOption grpc.DialOption
+ masters []string
}
func init() {
@@ -49,7 +50,8 @@ func init() {
copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
- copy.concurrency = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
+ copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
}
@@ -131,7 +133,7 @@ func runCopy(cmd *Command, args []string) bool {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
- fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrency)
+ fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
go func() {
defer close(fileCopyTaskChan)
@@ -142,7 +144,7 @@ func runCopy(cmd *Command, args []string) bool {
}
}
}()
- for i := 0; i < *copy.concurrency; i++ {
+ for i := 0; i < *copy.concurrenctFiles; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
@@ -345,41 +347,66 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
- var chunks []*filer_pb.FileChunk
+ chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
+
+ concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
+ var wg sync.WaitGroup
+ var uploadError error
+
+ fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
+ for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
+ wg.Add(1)
+ concurrentChunks <- struct{}{}
+ go func(i int64) {
+ defer func() {
+ wg.Done()
+ <-concurrentChunks
+ }()
+ // assign a volume
+ assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ Ttl: *worker.options.ttl,
+ })
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- for i := int64(0); i < int64(chunkCount); i++ {
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- // assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
- })
- if err != nil {
- fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
- }
+ uploadResult, err := operation.Upload(targetUrl,
+ fileName+"-"+strconv.FormatInt(i+1, 10),
+ io.NewSectionReader(f, i*chunkSize, chunkSize),
+ false, "application/octet-stream", nil, assignResult.Auth)
+ if err != nil {
+ uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ return
+ }
+ if uploadResult.Error != "" {
+ uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return
+ }
+ chunksChan <- &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ }
+ fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ }(i)
+ }
+ wg.Wait()
+ close(chunksChan)
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ if uploadError != nil {
+ return uploadError
+ }
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.LimitReader(f, chunkSize),
- false, "application/octet-stream", nil, assignResult.Auth)
- if err != nil {
- return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- }
- if uploadResult.Error != "" {
- return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- }
- chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- })
- fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ var chunks []*filer_pb.FileChunk
+ for chunk := range chunksChan {
+ chunks = append(chunks, chunk)
}
if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {