aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2016-07-20 23:45:55 -0700
committerChris Lu <chris.lu@gmail.com>2016-07-20 23:45:55 -0700
commitcdae9fc680c8e7f99c0ef47dd98f674df76e5078 (patch)
tree8189bdfb1f38fce7ddc2a2ff73ab040caeb5f30a
parent40ba6d2a6f146ab730cc5409c3dbcfb71f7acab9 (diff)
downloadseaweedfs-cdae9fc680c8e7f99c0ef47dd98f674df76e5078.tar.xz
seaweedfs-cdae9fc680c8e7f99c0ef47dd98f674df76e5078.zip
add "weed copy" command to copy files to filer
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/copy.go147
-rw-r--r--weed/command/server.go3
-rw-r--r--weed/command/upload.go10
-rw-r--r--weed/operation/filer/register.go31
-rw-r--r--weed/server/filer_server.go12
-rw-r--r--weed/server/filer_server_handlers_admin.go12
-rw-r--r--weed/util/http_util.go11
8 files changed, 217 insertions, 10 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index d654f57cd..c451936e5 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -11,6 +11,7 @@ var Commands = []*Command{
cmdBenchmark,
cmdBackup,
cmdCompact,
+ cmdCopy,
cmdFix,
cmdServer,
cmdMaster,
diff --git a/weed/command/copy.go b/weed/command/copy.go
new file mode 100644
index 000000000..0e7109daf
--- /dev/null
+++ b/weed/command/copy.go
@@ -0,0 +1,147 @@
+package command
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net/url"
+ "os"
+ "path/filepath"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ filer_operation "github.com/chrislusf/seaweedfs/weed/operation/filer"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+var (
+ copy CopyOptions
+)
+
+type CopyOptions struct {
+ master *string
+ include *string
+ replication *string
+ collection *string
+ ttl *string
+ maxMB *int
+ secretKey *string
+
+ secret security.Secret
+}
+
+func init() {
+ cmdCopy.Run = runCopy // break init cycle
+ cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
+ copy.master = cmdCopy.Flag.String("master", "localhost:9333", "SeaweedFS master location")
+ copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
+ copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
+ copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
+ copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
+ copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit")
+ copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
+}
+
+var cmdCopy = &Command{
+ UsageLine: "copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
+ Short: "copy one or a list of files to a filer folder",
+ Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
+
+ It can copy one or a list of files or folders.
+
+ If copying a whole folder recursively:
+ All files under the folder and subfolders will be copyed.
+ Optional parameter "-include" allows you to specify the file name patterns.
+
+ If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
+ This can save volume server's gzipped processing and allow customizable gzip compression level.
+ The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
+
+ If "maxMB" is set to a positive number, files larger than it would be split into chunks and copyed separatedly.
+ The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
+
+ `,
+}
+
+func runCopy(cmd *Command, args []string) bool {
+ copy.secret = security.Secret(*copy.secretKey)
+ if len(args) <= 1 {
+ return false
+ }
+ filerDestination := args[len(args)-1]
+ fileOrDirs := args[0 : len(args)-1]
+
+ filerUrl, err := url.Parse(filerDestination)
+ if err != nil {
+ fmt.Printf("The last argument should be a URL on filer: %v\n", err)
+ return false
+ }
+ if len(fileOrDirs) > 1 && !strings.HasSuffix(filerUrl.Path, "/") {
+ fmt.Println("Can not copy multiple items to a file. The last argument must be a folder ended with \"/\"")
+ return false
+ }
+
+ for _, fileOrDir := range fileOrDirs {
+ if !doEachCopy(fileOrDir, filerUrl.Host, filerUrl.Path) {
+ return false
+ }
+ }
+ return true
+}
+
+func doEachCopy(fileOrDir string, host string, path string) bool {
+ f, err := os.Open(fileOrDir)
+ if err != nil {
+ fmt.Printf("Failed to open file %s: %v", fileOrDir, err)
+ return false
+ }
+ defer f.Close()
+
+ fi, err := f.Stat()
+ if err != nil {
+ fmt.Printf("Failed to get stat for file %s: %v", fileOrDir, err)
+ return false
+ }
+
+ mode := fi.Mode()
+ if mode.IsDir() {
+ files, _ := ioutil.ReadDir(fileOrDir)
+ for _, subFileOrDir := range files {
+ if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") {
+ return false
+ }
+ }
+ return true
+ }
+
+ // this is a regular file
+ if *copy.include != "" {
+ if ok, _ := filepath.Match(*copy.include, filepath.Base(fileOrDir)); !ok {
+ return true
+ }
+ }
+
+ parts, err := operation.NewFileParts([]string{fileOrDir})
+ if err != nil {
+ fmt.Printf("Failed to read file %s: %v", fileOrDir, err)
+ }
+
+ results, err := operation.SubmitFiles(*copy.master, parts,
+ *copy.replication, *copy.collection,
+ *copy.ttl, *copy.maxMB, copy.secret)
+ if err != nil {
+ fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)
+ }
+
+ if strings.HasSuffix(path, "/") {
+ path = path + fi.Name()
+ }
+
+ if err = filer_operation.RegisterFile(host, path, results[0].Fid, copy.secret); err != nil {
+ fmt.Printf("Failed to register file %s on %s: %v", fileOrDir, host, err)
+ return false
+ }
+
+ fmt.Printf("Copy %s => http://%s/%s\n", fileOrDir, host, path)
+
+ return true
+}
diff --git a/weed/command/server.go b/weed/command/server.go
index 6ed1e5228..1211c7137 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -82,7 +82,7 @@ func init() {
filerOptions.master = cmdServer.Flag.String("filer.master", "", "default to current master server")
filerOptions.collection = cmdServer.Flag.String("filer.collection", "", "all data will be stored in this collection")
filerOptions.port = cmdServer.Flag.Int("filer.port", 8888, "filer server http listen port")
- filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -mdir is specified")
+ filerOptions.dir = cmdServer.Flag.String("filer.dir", "", "directory to store meta data, default to a 'filer' sub directory of what -dir is specified")
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
@@ -164,6 +164,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingFiler {
go func() {
+ time.Sleep(1 * time.Second)
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement,
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 0dfa115bb..1f0696f70 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -15,7 +15,7 @@ var (
)
type UploadOptions struct {
- server *string
+ master *string
dir *string
include *string
replication *string
@@ -28,7 +28,7 @@ type UploadOptions struct {
func init() {
cmdUpload.Run = runUpload // break init cycle
cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
- upload.server = cmdUpload.Flag.String("server", "localhost:9333", "SeaweedFS master location")
+ upload.master = cmdUpload.Flag.String("master", "localhost:9333", "SeaweedFS master location")
upload.dir = cmdUpload.Flag.String("dir", "", "Upload the whole folder recursively if specified.")
upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
@@ -39,7 +39,7 @@ func init() {
}
var cmdUpload = &Command{
- UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf",
+ UsageLine: "upload -master=localhost:9333 file1 [file2 file3]\n weed upload -server=localhost:9333 -dir=one_directory -include=*.pdf",
Short: "upload one or a list of files",
Long: `upload one or a list of files, or batch upload one whole folder recursively.
@@ -79,7 +79,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*upload.server, parts,
+ results, e := operation.SubmitFiles(*upload.master, parts,
*upload.replication, *upload.collection,
*upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results)
@@ -98,7 +98,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*upload.server, parts,
+ results, _ := operation.SubmitFiles(*upload.master, parts,
*upload.replication, *upload.collection,
*upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results)
diff --git a/weed/operation/filer/register.go b/weed/operation/filer/register.go
new file mode 100644
index 000000000..2c4703680
--- /dev/null
+++ b/weed/operation/filer/register.go
@@ -0,0 +1,31 @@
+package operation
+
+import (
+ "fmt"
+ "net/url"
+
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type SubmitResult struct {
+ FileName string `json:"fileName,omitempty"`
+ FileUrl string `json:"fileUrl,omitempty"`
+ Fid string `json:"fid,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func RegisterFile(filer string, path string, fileId string, secret security.Secret) error {
+ // TODO: jwt need to be used
+ _ = security.GenJwt(secret, fileId)
+
+ values := make(url.Values)
+ values.Add("path", path)
+ values.Add("fileId", fileId)
+ _, err := util.Post("http://"+filer+"/admin/register", values)
+ if err != nil {
+ return fmt.Errorf("Failed to register path:%s on filer:%s to file id:%s", path, filer, fileId)
+ }
+ return nil
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 1b54f0840..b99bbd7c9 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -62,6 +62,7 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
}
r.HandleFunc("/admin/mv", fs.moveHandler)
+ r.HandleFunc("/admin/register", fs.registerHandler)
}
r.HandleFunc("/", fs.filerHandler)
@@ -73,9 +74,14 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
//force initialize with all available master nodes
- _, err := fs.masterNodes.FindMaster()
- if err != nil {
- glog.Fatalf("filer server failed to get master cluster info:%s", err.Error())
+ for {
+ _, err := fs.masterNodes.FindMaster()
+ if err != nil {
+ glog.Infof("filer server failed to get master cluster info:%s", err.Error())
+ time.Sleep(3 * time.Second)
+ } else {
+ break
+ }
}
for {
diff --git a/weed/server/filer_server_handlers_admin.go b/weed/server/filer_server_handlers_admin.go
index 979ad517b..aa7c09986 100644
--- a/weed/server/filer_server_handlers_admin.go
+++ b/weed/server/filer_server_handlers_admin.go
@@ -27,3 +27,15 @@ func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
}
+
+func (fs *FilerServer) registerHandler(w http.ResponseWriter, r *http.Request) {
+ path := r.FormValue("path")
+ fileId := r.FormValue("fileId")
+ err := fs.filer.CreateFile(path, fileId)
+ if err != nil {
+ glog.V(4).Infof("register %s to %s error: %v", fileId, path, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ } else {
+ w.WriteHeader(http.StatusOK)
+ }
+}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index a54fc8779..2379e3b2b 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -32,6 +32,9 @@ func PostBytes(url string, body []byte) ([]byte, error) {
return nil, fmt.Errorf("Post to %s: %v", url, err)
}
defer r.Body.Close()
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("Read response body: %v", err)
@@ -45,6 +48,9 @@ func Post(url string, values url.Values) ([]byte, error) {
return nil, err
}
defer r.Body.Close()
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
@@ -59,7 +65,7 @@ func Get(url string) ([]byte, error) {
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
- if r.StatusCode != 200 {
+ if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
if err != nil {
@@ -81,6 +87,9 @@ func Delete(url string, jwt security.EncodedJwt) error {
return e
}
defer resp.Body.Close()
+ if resp.StatusCode >= 400 {
+ return fmt.Errorf("%s: %s", url, resp.Status)
+ }
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err