aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/filer_copy.go94
1 files changed, 88 insertions, 6 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 5a878e675..86f43348f 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -13,6 +13,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"path"
"net/http"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "strconv"
+ "io"
+ "time"
+ "google.golang.org/grpc"
+ "context"
)
var (
@@ -144,10 +150,10 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
return uploadFileAsOne(host, path, assignResult, f, fi)
}
- return uploadFileInChunks(host, path, assignResult, f, chunkCount)
+ return uploadFileInChunks(host, path, assignResult, f, fi, chunkCount, chunkSize)
}
-func uploadFileAsOne(filerUrl string, urlPath string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool {
+func uploadFileAsOne(filerUrl string, urlFolder string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo) bool {
// upload the file content
mimeType := detectMimeType(f)
@@ -164,19 +170,82 @@ func uploadFileAsOne(filerUrl string, urlPath string, assignResult *operation.As
fmt.Printf("upload %v to %s result: %v\n", f.Name(), targetUrl, uploadResult.Error)
return false
}
+ fmt.Printf("uploaded %s to %s\n", f.Name(), targetUrl)
- if err = filer_operation.RegisterFile(filerUrl, filepath.Join(urlPath, f.Name()), assignResult.Fid, fi.Size(),
+ if err = filer_operation.RegisterFile(filerUrl, filepath.Join(urlFolder, f.Name()), assignResult.Fid, fi.Size(),
os.Getuid(), os.Getgid(), copy.secret); err != nil {
fmt.Printf("Failed to register file %s on %s: %v\n", f.Name(), filerUrl, err)
return false
}
- fmt.Printf("Copied %s => http://%s%s\n", f.Name(), filerUrl, urlPath)
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), filerUrl, urlFolder, f.Name())
return true
}
-func uploadFileInChunks(filerUrl string, path string, assignResult *operation.AssignResult, f *os.File, chunkCount int) bool {
- return false
+func uploadFileInChunks(filerUrl string, urlFolder string, assignResult *operation.AssignResult, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool {
+
+ var chunks []*filer_pb.FileChunk
+
+ for i := int64(0); i < int64(chunkCount); i++ {
+ fileId := assignResult.Fid
+ if i > 0 {
+ fileId += "_" + strconv.FormatInt(i, 10)
+ }
+
+ targetUrl := "http://" + assignResult.Url + "/" + fileId
+
+ uploadResult, err := operation.Upload(targetUrl,
+ f.Name()+"-"+strconv.FormatInt(i+1, 10),
+ io.LimitReader(f, chunkSize),
+ false, "application/octet-stream", nil, "")
+ if err != nil {
+ fmt.Printf("upload data %v to %s: %v\n", f.Name(), targetUrl, err)
+ return false
+ }
+ if uploadResult.Error != "" {
+ fmt.Printf("upload %v to %s result: %v\n", f.Name(), targetUrl, uploadResult.Error)
+ return false
+ }
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: i * chunkSize,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ })
+ fmt.Printf("uploaded %s split %d => %s\n", f.Name(), i, targetUrl)
+ }
+
+ if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.CreateEntryRequest{
+ Directory: urlFolder,
+ Entry: &filer_pb.Entry{
+ Name: f.Name(),
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ Gid: uint32(os.Getgid()),
+ Uid: uint32(os.Getuid()),
+ FileSize: uint64(fi.Size()),
+ FileMode: uint32(fi.Mode()),
+ },
+ Chunks: chunks,
+ },
+ }
+
+ fmt.Printf("%s%s set chunks: %v", urlFolder, f.Name(), len(chunks))
+ for i, chunk := range chunks {
+ fmt.Printf("%s%s chunks %d: %v [%d,%d)\n", urlFolder, f.Name(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ }
+ if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ return fmt.Errorf("update fh: %v", err)
+ }
+ return nil
+ }); err != nil {
+ fmt.Printf("upload data %v to http://%s%s%s: %v\n", f.Name(), filerUrl, urlFolder, f.Name(), err)
+ return false
+ }
+
+ return true
}
func isGzipped(filename string) bool {
@@ -195,3 +264,16 @@ func detectMimeType(f *os.File) string {
mimeType := http.DetectContentType(head[:n])
return mimeType
}
+
+func withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ grpcConnection, err := grpc.Dial(filerAddress, grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", filerAddress, err)
+ }
+ defer grpcConnection.Close()
+
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+
+ return fn(client)
+}