aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
authorHongyanShen <763987993@qq.com>2020-03-11 12:55:24 +0800
committerGitHub <noreply@github.com>2020-03-11 12:55:24 +0800
commit03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch)
treeed8833386a712c850dcef0815509774681a6ab56 /weed/command/filer_copy.go
parent0fca1ae776783b37481549df40f477b7d9248d3c (diff)
parent60f5f05c78a2918d5219c925cea5847759281a2c (diff)
downloadseaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz
seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go179
1 files changed, 111 insertions, 68 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 9995cf6aa..0aee8cd80 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -14,13 +14,15 @@ import (
"sync"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
)
var (
@@ -37,9 +39,10 @@ type CopyOptions struct {
masterClient *wdclient.MasterClient
concurrenctFiles *int
concurrenctChunks *int
- compressionLevel *int
grpcDialOption grpc.DialOption
masters []string
+ cipher bool
+ ttlSec int32
}
func init() {
@@ -52,7 +55,6 @@ func init() {
copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
- copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
}
var cmdCopy = &Command{
@@ -105,11 +107,9 @@ func runCopy(cmd *Command, args []string) bool {
filerGrpcPort := filerPort + 10000
filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
- copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- ctx := context.Background()
-
- masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress)
+ masters, collection, replication, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
if err != nil {
fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
return false
@@ -124,10 +124,14 @@ func runCopy(cmd *Command, args []string) bool {
*copy.maxMB = int(maxMB)
}
copy.masters = masters
+ copy.cipher = cipher
- copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
- go copy.masterClient.KeepConnectedToMaster()
- copy.masterClient.WaitUntilConnected()
+ ttl, err := needle.ReadTTL(*copy.ttl)
+ if err != nil {
+ fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
+ return false
+ }
+ copy.ttlSec = int32(ttl.Minutes()) * 60
if *cmdCopy.IsDebug {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
@@ -153,7 +157,7 @@ func runCopy(cmd *Command, args []string) bool {
filerHost: filerUrl.Host,
filerGrpcAddress: filerGrpcAddress,
}
- if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
+ if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
return
}
@@ -164,13 +168,14 @@ func runCopy(cmd *Command, args []string) bool {
return true
}
-func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
- err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, cipher bool, err error) {
+ err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
}
masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
+ cipher = resp.Cipher
return nil
})
return
@@ -215,9 +220,9 @@ type FileCopyWorker struct {
filerGrpcAddress string
}
-func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
+func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
for task := range fileCopyTaskChan {
- if err := worker.doEachCopy(ctx, task); err != nil {
+ if err := worker.doEachCopy(task); err != nil {
return err
}
}
@@ -233,7 +238,7 @@ type FileCopyTask struct {
gid uint32
}
-func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
+func (worker *FileCopyWorker) doEachCopy(task FileCopyTask) error {
f, err := os.Open(task.sourceLocation)
if err != nil {
@@ -261,36 +266,55 @@ func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask)
}
if chunkCount == 1 {
- return worker.uploadFileAsOne(ctx, task, f)
+ return worker.uploadFileAsOne(task, f)
}
- return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
+ return worker.uploadFileInChunks(task, f, chunkCount, chunkSize)
}
-func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
+func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) error {
// upload the file content
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
+ data, err := ioutil.ReadAll(f)
+ if err != nil {
+ return err
+ }
var chunks []*filer_pb.FileChunk
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
if task.fileSize > 0 {
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
- uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
+ uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
@@ -300,17 +324,19 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
chunks = append(chunks, &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: 0,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: assignResult.FileId,
+ Offset: 0,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.Md5,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
})
fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
}
- if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -325,13 +351,13 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
Mime: mimeType,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -342,7 +368,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopy
return nil
}
-func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
+func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
fileName := filepath.Base(f.Name())
mimeType := detectMimeType(f)
@@ -352,6 +378,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
var wg sync.WaitGroup
var uploadError error
+ var collection, replication string
fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
@@ -363,22 +390,42 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
<-concurrentChunks
}()
// assign a volume
- assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- Ttl: *worker.options.ttl,
+ var assignResult *filer_pb.AssignVolumeResponse
+ var assignError error
+ err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ ParentPath: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
+ if err != nil {
+ fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
+ }
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ if collection == "" {
+ collection = assignResult.Collection
+ }
+ if replication == "" {
+ replication = assignResult.Replication
+ }
- uploadResult, err := operation.Upload(targetUrl,
- fileName+"-"+strconv.FormatInt(i+1, 10),
- io.NewSectionReader(f, i*chunkSize, chunkSize),
- false, "application/octet-stream", nil, assignResult.Auth)
+ uploadResult, err := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -388,11 +435,13 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
return
}
chunksChan <- &filer_pb.FileChunk{
- FileId: assignResult.Fid,
- Offset: i * chunkSize,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: assignResult.FileId,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
}
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
@@ -410,11 +459,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
+ operation.DeleteFiles(copy.masters[0], worker.options.grpcDialOption, fileIds)
return uploadError
}
- if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -427,15 +476,15 @@ func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileC
FileSize: uint64(task.fileSize),
FileMode: uint32(task.fileMode),
Mime: mimeType,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ Replication: replication,
+ Collection: collection,
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
}
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("update fh: %v", err)
}
return nil
@@ -457,18 +506,12 @@ func detectMimeType(f *os.File) string {
}
if err != nil {
fmt.Printf("read head of %v: %v\n", f.Name(), err)
- return "application/octet-stream"
+ return ""
}
f.Seek(0, io.SeekStart)
mimeType := http.DetectContentType(head[:n])
+ if mimeType == "application/octet-stream" {
+ return ""
+ }
return mimeType
}
-
-func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
-
- return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(clientConn)
- return fn(client)
- }, filerAddress, grpcDialOption)
-
-}