aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_copy.go74
1 files changed, 57 insertions, 17 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 8a754bb55..3b8193cd1 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -11,6 +11,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer"
"github.com/chrislusf/seaweedfs/weed/security"
+ "path"
+ "net/http"
)
var (
@@ -68,20 +70,20 @@ func runCopy(cmd *Command, args []string) bool {
return false
}
filerDestination := args[len(args)-1]
- fileOrDirs := args[0 : len(args)-1]
+ fileOrDirs := args[0: len(args)-1]
filerUrl, err := url.Parse(filerDestination)
if err != nil {
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
return false
}
- path := filerUrl.Path
- if !strings.HasSuffix(path, "/") {
- path = path + "/"
+ urlPath := filerUrl.Path
+ if !strings.HasSuffix(urlPath, "/") {
+ urlPath = urlPath + "/"
}
for _, fileOrDir := range fileOrDirs {
- if !doEachCopy(fileOrDir, filerUrl.Host, path) {
+ if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) {
return false
}
}
@@ -120,29 +122,67 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
}
}
- parts, err := operation.NewFileParts([]string{fileOrDir})
+ // find the chunk count
+ chunkSize := int64(*copy.maxMB * 1024 * 1024)
+ chunkCount := 1
+ if chunkSize > 0 && fi.Size() > chunkSize {
+ chunkCount = int(fi.Size()/chunkSize) + 1
+ }
+
+ // assign a volume
+ assignResult, err := operation.Assign(*copy.master, &operation.VolumeAssignRequest{
+ Count: uint64(chunkCount),
+ Replication: *copy.replication,
+ Collection: *copy.collection,
+ Ttl: *copy.ttl,
+ })
if err != nil {
- fmt.Printf("Failed to read file %s: %v\n", fileOrDir, err)
+ fmt.Printf("Failed to assign from %s: %v\n", *copy.master, err)
+ }
+
+ if chunkCount == 1 {
+ return uploadFileAsOne(host, path, assignResult, f, fi)
}
- results, err := operation.SubmitFiles(*copy.master, parts,
- *copy.replication, *copy.collection, "",
- *copy.ttl, *copy.maxMB, copy.secret)
+ return uploadFileInChunks(host, path, assignResult, f, chunkCount)
+}
+
+func uploadFileAsOne(filerUrl string, urlPath string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool {
+ // upload the file content
+ ext := strings.ToLower(path.Ext(f.Name()))
+ head := make([]byte, 512)
+ f.Seek(0, 0)
+ n, err := f.Read(head)
if err != nil {
- fmt.Printf("Failed to submit file %s: %v\n", fileOrDir, err)
+ fmt.Printf("read head of %v: %v\n", f.Name(), err)
+ return false
}
+ f.Seek(0, 0)
+ mimeType := http.DetectContentType(head[:n])
+ isGzipped := ext == ".gz"
+
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- if strings.HasSuffix(path, "/") {
- path = path + fi.Name()
+ uploadResult, err := operation.Upload(targetUrl, f.Name(), f, isGzipped, mimeType, nil, "")
+ if err != nil {
+ fmt.Printf("upload data %v to %s: %v\n", f.Name(), targetUrl, err)
+ return false
+ }
+ if uploadResult.Error != "" {
+ fmt.Printf("upload %v to %s result: %v\n", f.Name(), targetUrl, uploadResult.Error)
+ return false
}
- if err = filer_operation.RegisterFile(host, path, results[0].Fid, parts[0].FileSize,
+ if err = filer_operation.RegisterFile(filerUrl, filepath.Join(urlPath, f.Name()), assignResult.Fid, fi.Size(),
os.Getuid(), os.Getgid(), copy.secret); err != nil {
- fmt.Printf("Failed to register file %s on %s: %v\n", fileOrDir, host, err)
+ fmt.Printf("Failed to register file %s on %s: %v\n", f.Name(), filerUrl, err)
return false
}
- fmt.Printf("Copy %s => http://%s%s\n", fileOrDir, host, path)
-
+ fmt.Printf("Copied %s => http://%s%s\n", f.Name(), filerUrl, urlPath)
return true
}
+
+func uploadFileInChunks(filerUrl string, path string, assignResult *operation.AssignResult, f *os.File, chunkCount int) bool {
+ return false
+}