aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src/cmd
diff options
context:
space:
mode:
authorchris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0>2012-08-07 08:29:22 +0000
committerchris.lu@gmail.com <chris.lu@gmail.com@282b0af5-e82d-9cf1-ede4-77906d7719d0>2012-08-07 08:29:22 +0000
commit5e9dc0e66829805f1084fef7f19ca95dba9baae6 (patch)
treeda3133296340d744587a44ae240a90ff0b21f480 /weed-fs/src/cmd
parent3f1136c19475eeafb60c84456328fd01174b0817 (diff)
downloadseaweedfs-5e9dc0e66829805f1084fef7f19ca95dba9baae6.tar.xz
seaweedfs-5e9dc0e66829805f1084fef7f19ca95dba9baae6.zip
all switching to "weed command [args]" usage mode
git-svn-id: https://weed-fs.googlecode.com/svn/trunk@64 282b0af5-e82d-9cf1-ede4-77906d7719d0
Diffstat (limited to 'weed-fs/src/cmd')
-rw-r--r--weed-fs/src/cmd/weed/fix.go4
-rw-r--r--weed-fs/src/cmd/weed/master.go88
-rw-r--r--weed-fs/src/cmd/weed/upload.go124
-rw-r--r--weed-fs/src/cmd/weed/volume.go177
-rw-r--r--weed-fs/src/cmd/weed/weed.go22
5 files changed, 412 insertions, 3 deletions
diff --git a/weed-fs/src/cmd/weed/fix.go b/weed-fs/src/cmd/weed/fix.go
index e1d5da02d..dfb6969eb 100644
--- a/weed-fs/src/cmd/weed/fix.go
+++ b/weed-fs/src/cmd/weed/fix.go
@@ -10,6 +10,7 @@ import (
func init() {
cmdFix.Run = runFix // break init cycle
+ IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode")
}
var cmdFix = &Command{
@@ -23,9 +24,6 @@ var cmdFix = &Command{
var (
dir = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
volumeId = cmdFix.Flag.Int("volumeId", -1, "a non-negative volume id. The volume should already exist in the dir. The volume index file should not exist.")
- IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode")
-
- store *storage.Store
)
func runFix(cmd *Command, args []string) bool {
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
new file mode 100644
index 000000000..e6af3809a
--- /dev/null
+++ b/weed-fs/src/cmd/weed/master.go
@@ -0,0 +1,88 @@
+package main
+
+import (
+ "pkg/directory"
+ "encoding/json"
+ "log"
+ "net/http"
+ "pkg/storage"
+ "strconv"
+ "strings"
+)
+
+func init() {
+ cmdMaster.Run = runMaster // break init cycle
+ IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
+ port = cmdMaster.Flag.Int("port", 8080, "http listen port")
+}
+
+var cmdMaster = &Command{
+ UsageLine: "master -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333",
+ Short: "start a master server",
+ Long: `start a master server to provide volume=>location mapping service
+ and sequence number of file ids
+
+ `,
+}
+
+var (
+ metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
+ capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold")
+ mapper *directory.Mapper
+ volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
+)
+
+func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
+ vid := r.FormValue("volumeId")
+ commaSep := strings.Index(vid, ",")
+ if commaSep > 0 {
+ vid = vid[0:commaSep]
+ }
+ volumeId, _ := strconv.ParseUint(vid, 10, 64)
+ machine, e := mapper.Get(uint32(volumeId))
+ if e == nil {
+ writeJson(w, r, machine.Server)
+ } else {
+ log.Println("Invalid volume id", volumeId)
+ writeJson(w, r, map[string]string{"error": "volume id " + strconv.FormatUint(volumeId, 10) + " not found"})
+ }
+}
+func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
+ c:=r.FormValue("count")
+ fid, count, machine, err := mapper.PickForWrite(c)
+ if err == nil {
+ writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)})
+ } else {
+ log.Println(err)
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+}
+func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
+ s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
+ publicUrl := r.FormValue("publicUrl")
+ volumes := new([]storage.VolumeInfo)
+ json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
+ if *IsDebug {
+ log.Println(s, "volumes", r.FormValue("volumes"))
+ }
+ mapper.Add(*directory.NewMachine(s, publicUrl, *volumes))
+}
+func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
+ writeJson(w, r, mapper)
+}
+
+func runMaster(cmd *Command, args []string) bool {
+ log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
+ mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024)
+ http.HandleFunc("/dir/assign", dirAssignHandler)
+ http.HandleFunc("/dir/lookup", dirLookupHandler)
+ http.HandleFunc("/dir/join", dirJoinHandler)
+ http.HandleFunc("/dir/status", dirStatusHandler)
+
+ log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*port))
+ e := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
+ if e != nil {
+ log.Fatal("Fail to start:", e)
+ }
+ return true
+}
diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go
new file mode 100644
index 000000000..5cfb6ee97
--- /dev/null
+++ b/weed-fs/src/cmd/weed/upload.go
@@ -0,0 +1,124 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "net/url"
+ "os"
+ "pkg/util"
+ "strconv"
+)
+
+func init() {
+ cmdUpload.Run = runUpload // break init cycle
+ IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
+ server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
+}
+
+var cmdUpload = &Command{
+ UsageLine: "upload -server=localhost:9333 file1 file2 file2",
+ Short: "upload a set of files, using consecutive file keys",
+ Long: `upload a set of files, using consecutive file keys.
+ e.g. If the file1 uses key k, file2 can be read via k_1
+
+ `,
+}
+
+type AssignResult struct {
+ Fid string "fid"
+ Url string "url"
+ PublicUrl string "publicUrl"
+ Count int `json:",string"`
+ Error string "error"
+}
+
+func assign(count int) (AssignResult, error) {
+ values := make(url.Values)
+ values.Add("count", strconv.Itoa(count))
+ jsonBlob := util.Post("http://"+*server+"/dir/assign", values)
+ var ret AssignResult
+ err := json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return ret, err
+ }
+ if ret.Count <= 0 {
+ return ret, errors.New(ret.Error)
+ }
+ return ret, nil
+}
+
+type UploadResult struct {
+ Size int
+}
+
+func upload(filename string, uploadUrl string) (int, string) {
+ body_buf := bytes.NewBufferString("")
+ body_writer := multipart.NewWriter(body_buf)
+ file_writer, err := body_writer.CreateFormFile("file", filename)
+ if err != nil {
+ panic(err.Error())
+ }
+ fh, err := os.Open(filename)
+ if err != nil {
+ panic(err.Error())
+ }
+ io.Copy(file_writer, fh)
+ content_type := body_writer.FormDataContentType()
+ body_writer.Close()
+ resp, err := http.Post(uploadUrl, content_type, body_buf)
+ if err != nil {
+ panic(err.Error())
+ }
+ defer resp.Body.Close()
+ resp_body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ panic(err.Error())
+ }
+ var ret UploadResult
+ err = json.Unmarshal(resp_body, &ret)
+ if err != nil {
+ panic(err.Error())
+ }
+ //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
+ return ret.Size, uploadUrl
+}
+
+type SubmitResult struct {
+ Fid string "fid"
+ Size int "size"
+}
+
+func submit(files []string)([]SubmitResult) {
+ ret, err := assign(len(files))
+ if err != nil {
+ panic(err)
+ }
+ results := make([]SubmitResult, len(files))
+ for index, file := range files {
+ fid := ret.Fid
+ if index > 0 {
+ fid = fid + "_" + strconv.Itoa(index)
+ }
+ uploadUrl := "http://" + ret.PublicUrl + "/" + fid
+ results[index].Size, _ = upload(file, uploadUrl)
+ results[index].Fid = fid
+ }
+ return results
+}
+
+func runUpload(cmd *Command, args []string) bool {
+ if len(cmdUpload.Flag.Args()) == 0 {
+ return false
+ }
+ results := submit(flag.Args())
+ bytes, _ := json.Marshal(results)
+ fmt.Print(string(bytes))
+ return true
+}
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
new file mode 100644
index 000000000..45261f017
--- /dev/null
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -0,0 +1,177 @@
+package main
+
+import (
+ "log"
+ "math/rand"
+ "mime"
+ "net/http"
+ "pkg/storage"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func init() {
+ cmdVolume.Run = runVolume // break init cycle
+ IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
+ port = cmdVolume.Flag.Int("port", 8080, "http listen port")
+}
+
+var cmdVolume = &Command{
+ UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333",
+ Short: "start a volume server",
+ Long: `start a volume server to provide storage spaces
+
+ `,
+}
+
+var (
+ chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files")
+ volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids")
+ publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read")
+ metaServer = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings")
+ pulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+
+ store *storage.Store
+
+)
+
+func statusHandler(w http.ResponseWriter, r *http.Request) {
+ writeJson(w, r, store.Status())
+}
+func addVolumeHandler(w http.ResponseWriter, r *http.Request) {
+ store.AddVolume(r.FormValue("volume"))
+ writeJson(w, r, store.Status())
+}
+func storeHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "GET":
+ GetHandler(w, r)
+ case "DELETE":
+ DeleteHandler(w, r)
+ case "POST":
+ PostHandler(w, r)
+ }
+}
+func GetHandler(w http.ResponseWriter, r *http.Request) {
+ n := new(storage.Needle)
+ vid, fid, ext := parseURLPath(r.URL.Path)
+ volumeId, _ := strconv.ParseUint(vid, 10, 64)
+ n.ParsePath(fid)
+
+ if *IsDebug {
+ log.Println("volume", volumeId, "reading", n)
+ }
+ cookie := n.Cookie
+ count, e := store.Read(volumeId, n)
+ if *IsDebug {
+ log.Println("read bytes", count, "error", e)
+ }
+ if n.Cookie != cookie {
+ log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
+ return
+ }
+ if ext != "" {
+ mtype := mime.TypeByExtension(ext)
+ w.Header().Set("Content-Type", mtype)
+ if storage.IsCompressable(ext, mtype){
+ if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip"){
+ w.Header().Set("Content-Encoding", "gzip")
+ }else{
+ n.Data = storage.UnGzipData(n.Data)
+ }
+ }
+ }
+ w.Write(n.Data)
+}
+func PostHandler(w http.ResponseWriter, r *http.Request) {
+ vid, _, _ := parseURLPath(r.URL.Path)
+ volumeId, e := strconv.ParseUint(vid, 10, 64)
+ if e != nil {
+ writeJson(w, r, e)
+ } else {
+ needle, ne := storage.NewNeedle(r)
+ if ne != nil {
+ writeJson(w, r, ne)
+ } else {
+ ret := store.Write(volumeId, needle)
+ m := make(map[string]uint32)
+ m["size"] = ret
+ writeJson(w, r, m)
+ }
+ }
+}
+func DeleteHandler(w http.ResponseWriter, r *http.Request) {
+ n := new(storage.Needle)
+ vid, fid, _ := parseURLPath(r.URL.Path)
+ volumeId, _ := strconv.ParseUint(vid, 10, 64)
+ n.ParsePath(fid)
+
+ if *IsDebug {
+ log.Println("deleting", n)
+ }
+
+ cookie := n.Cookie
+ count, ok := store.Read(volumeId, n)
+
+ if ok != nil {
+ m := make(map[string]uint32)
+ m["size"] = 0
+ writeJson(w, r, m)
+ return
+ }
+
+ if n.Cookie != cookie {
+ log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
+ return
+ }
+
+ n.Size = 0
+ store.Delete(volumeId, n)
+ m := make(map[string]uint32)
+ m["size"] = uint32(count)
+ writeJson(w, r, m)
+}
+func parseURLPath(path string) (vid, fid, ext string) {
+ sepIndex := strings.LastIndex(path, "/")
+ commaIndex := strings.LastIndex(path[sepIndex:], ",")
+ if commaIndex <= 0 {
+ if "favicon.ico" != path[sepIndex+1:] {
+ log.Println("unknown file id", path[sepIndex+1:])
+ }
+ return
+ }
+ dotIndex := strings.LastIndex(path[sepIndex:], ".")
+ vid = path[sepIndex+1 : commaIndex]
+ fid = path[commaIndex+1:]
+ ext = ""
+ if dotIndex > 0 {
+ fid = path[commaIndex+1 : dotIndex]
+ ext = path[dotIndex:]
+ }
+ return
+}
+
+func runVolume(cmd *Command, args []string) bool {
+ //TODO: now default to 1G, this value should come from server?
+ store = storage.NewStore(*port, *publicUrl, *chunkFolder, *volumes)
+ defer store.Close()
+ http.HandleFunc("/", storeHandler)
+ http.HandleFunc("/status", statusHandler)
+ http.HandleFunc("/add_volume", addVolumeHandler)
+
+ go func() {
+ for {
+ store.Join(*metaServer)
+ time.Sleep(time.Duration(float32(*pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+ log.Println("store joined at", *metaServer)
+
+ log.Println("Start storage service at http://127.0.0.1:"+strconv.Itoa(*port), "public url", *publicUrl)
+ e := http.ListenAndServe(":"+strconv.Itoa(*port), nil)
+ if e != nil {
+ log.Fatalf("Fail to start:", e)
+ }
+ return true
+}
diff --git a/weed-fs/src/cmd/weed/weed.go b/weed-fs/src/cmd/weed/weed.go
index b9122d8e6..72ce4c09c 100644
--- a/weed-fs/src/cmd/weed/weed.go
+++ b/weed-fs/src/cmd/weed/weed.go
@@ -1,8 +1,10 @@
package main
import (
+ "encoding/json"
"flag"
"fmt"
+ "net/http"
"io"
"log"
"os"
@@ -13,9 +15,16 @@ import (
"unicode/utf8"
)
+var IsDebug *bool
+var server *string
+var port *int
+
var commands = []*Command{
cmdFix,
+ cmdMaster,
+ cmdUpload,
cmdVersion,
+ cmdVolume,
}
var exitStatus = 0
@@ -163,3 +172,16 @@ func exitIfErrors() {
exit()
}
}
+func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
+ w.Header().Set("Content-Type", "application/javascript")
+ bytes, _ := json.Marshal(obj)
+ callback := r.FormValue("callback")
+ if callback == "" {
+ w.Write(bytes)
+ } else {
+ w.Write([]uint8(callback))
+ w.Write([]uint8("("))
+ fmt.Fprint(w, string(bytes))
+ w.Write([]uint8(")"))
+ }
+}