aboutsummaryrefslogtreecommitdiff
path: root/go/cmd
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-02-10 03:49:51 -0800
committerChris Lu <chris.lu@gmail.com>2013-02-10 03:49:51 -0800
commit5071f528f649f3f99336c7d491ceef4859e34744 (patch)
tree0c4bc8a286597cd79e22b1ce02cd9cd3b1c44602 /go/cmd
parent55f2627fcf965c3765ad9b63878e9a22a59f4b55 (diff)
downloadseaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.tar.xz
seaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.zip
testing compilation with remove package
Diffstat (limited to 'go/cmd')
-rw-r--r--go/cmd/command.go54
-rw-r--r--go/cmd/export.go164
-rw-r--r--go/cmd/fix.go64
-rw-r--r--go/cmd/master.go217
-rw-r--r--go/cmd/shell.go53
-rw-r--r--go/cmd/upload.go113
-rw-r--r--go/cmd/version.go26
-rw-r--r--go/cmd/volume.go378
-rw-r--r--go/cmd/weed.go199
9 files changed, 1268 insertions, 0 deletions
diff --git a/go/cmd/command.go b/go/cmd/command.go
new file mode 100644
index 000000000..c8d86ca66
--- /dev/null
+++ b/go/cmd/command.go
@@ -0,0 +1,54 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "strings"
+)
+
+type Command struct {
+ // Run runs the command.
+ // The args are the arguments after the command name.
+ Run func(cmd *Command, args []string) bool
+
+ // UsageLine is the one-line usage message.
+ // The first word in the line is taken to be the command name.
+ UsageLine string
+
+ // Short is the short description shown in the 'go help' output.
+ Short string
+
+ // Long is the long message shown in the 'go help <this-command>' output.
+ Long string
+
+ // Flag is a set of flags specific to this command.
+ Flag flag.FlagSet
+
+ IsDebug *bool
+}
+
+// Name returns the command's name: the first word in the usage line.
+func (c *Command) Name() string {
+ name := c.UsageLine
+ i := strings.Index(name, " ")
+ if i >= 0 {
+ name = name[:i]
+ }
+ return name
+}
+
+func (c *Command) Usage() {
+ fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
+ fmt.Fprintf(os.Stderr, "Default Usage:\n")
+ c.Flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, "Description:\n")
+ fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
+ os.Exit(2)
+}
+
+// Runnable reports whether the command can be run; otherwise
+// it is a documentation pseudo-command such as importpath.
+func (c *Command) Runnable() bool {
+ return c.Run != nil
+}
diff --git a/go/cmd/export.go b/go/cmd/export.go
new file mode 100644
index 000000000..fd9d51164
--- /dev/null
+++ b/go/cmd/export.go
@@ -0,0 +1,164 @@
+package main
+
+import (
+ "archive/tar"
+ "bytes"
+ "fmt"
+ "log"
+ "os"
+ "path"
+ "code.google.com/p/weed-fs/go/directory"
+ "code.google.com/p/weed-fs/go/storage"
+ "strconv"
+ "strings"
+ "text/template"
+ "time"
+)
+
+func init() {
+ cmdExport.Run = runExport // break init cycle
+ cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode")
+}
+
+const (
+ defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}`
+)
+
+var cmdExport = &Command{
+ UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}",
+ Short: "list or export files from one volume data file",
+ Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified.
+
+ The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}.
+
+ `,
+}
+
+var (
+ exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
+ exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+ dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
+ format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
+ tarFh *tar.Writer
+ tarHeader tar.Header
+ fnTmpl *template.Template
+ fnTmplBuf = bytes.NewBuffer(nil)
+)
+
+func runExport(cmd *Command, args []string) bool {
+
+ if *exportVolumeId == -1 {
+ return false
+ }
+
+ var err error
+ if *dest != "" {
+ if *dest != "-" && !strings.HasSuffix(*dest, ".tar") {
+ fmt.Println("the output file", *dest, "should be '-' or end with .tar")
+ return false
+ }
+
+ if fnTmpl, err = template.New("name").Parse(*format); err != nil {
+ fmt.Println("cannot parse format " + *format + ": " + err.Error())
+ return false
+ }
+
+ var fh *os.File
+ if *dest == "-" {
+ fh = os.Stdout
+ } else {
+ if fh, err = os.Create(*dest); err != nil {
+ log.Fatalf("cannot open output tar %s: %s", *dest, err)
+ }
+ }
+ defer fh.Close()
+ tarFh = tar.NewWriter(fh)
+ defer tarFh.Close()
+ t := time.Now()
+ tarHeader = tar.Header{Mode: 0644,
+ ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
+ Typeflag: tar.TypeReg,
+ AccessTime: t, ChangeTime: t}
+ }
+
+ fileName := strconv.Itoa(*exportVolumeId)
+ vid := storage.VolumeId(*exportVolumeId)
+ indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
+ if err != nil {
+ log.Fatalf("Create Volume Index [ERROR] %s\n", err)
+ }
+ defer indexFile.Close()
+
+ nm := storage.LoadNeedleMap(indexFile)
+
+ var version storage.Version
+
+ err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error {
+ version = superBlock.Version
+ return nil
+ }, func(n *storage.Needle, offset uint32) error {
+ debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
+ nv, ok := nm.Get(n.Id)
+ if ok && nv.Size > 0 {
+ return walker(vid, n, version)
+ } else {
+ if !ok {
+ debug("This seems deleted", n.Id)
+ } else {
+ debug("Id", n.Id, "size", n.Size)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ log.Fatalf("Export Volume File [ERROR] %s\n", err)
+ }
+ return true
+}
+
+type nameParams struct {
+ Name string
+ Id uint64
+ Mime string
+ Key string
+}
+
+func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) {
+ key := directory.NewFileId(vid, n.Id, n.Cookie).String()
+ if tarFh != nil {
+ fnTmplBuf.Reset()
+ if err = fnTmpl.Execute(fnTmplBuf,
+ nameParams{Name: string(n.Name),
+ Id: n.Id,
+ Mime: string(n.Mime),
+ Key: key,
+ },
+ ); err != nil {
+ return err
+ }
+ nm := fnTmplBuf.String()
+
+ if n.IsGzipped() && path.Ext(nm) != ".gz" {
+ nm = nm + ".gz"
+ }
+
+ tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
+ if err = tarFh.WriteHeader(&tarHeader); err != nil {
+ return err
+ }
+ _, err = tarFh.Write(n.Data)
+ } else {
+ size := n.DataSize
+ if version == storage.Version1 {
+ size = n.Size
+ }
+ fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n",
+ key,
+ n.Name,
+ size,
+ n.IsGzipped(),
+ n.Mime,
+ )
+ }
+ return
+}
diff --git a/go/cmd/fix.go b/go/cmd/fix.go
new file mode 100644
index 000000000..249007252
--- /dev/null
+++ b/go/cmd/fix.go
@@ -0,0 +1,64 @@
+package main
+
+import (
+ "log"
+ "os"
+ "path"
+ "code.google.com/p/weed-fs/go/storage"
+ "strconv"
+)
+
+func init() {
+ cmdFix.Run = runFix // break init cycle
+ cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode")
+}
+
+var cmdFix = &Command{
+ UsageLine: "fix -dir=/tmp -volumeId=234",
+ Short: "run weed tool fix on index file if corrupted",
+ Long: `Fix runs the WeedFS fix command to re-create the index .idx file.
+
+ `,
+}
+
+var (
+ fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
+ fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+)
+
+func runFix(cmd *Command, args []string) bool {
+
+ if *fixVolumeId == -1 {
+ return false
+ }
+
+ fileName := strconv.Itoa(*fixVolumeId)
+ indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
+ if err != nil {
+ log.Fatalf("Create Volume Index [ERROR] %s\n", err)
+ }
+ defer indexFile.Close()
+
+ nm := storage.NewNeedleMap(indexFile)
+ defer nm.Close()
+
+ vid := storage.VolumeId(*fixVolumeId)
+ err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error {
+ return nil
+ }, func(n *storage.Needle, offset uint32) error {
+ debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
+ if n.Size > 0 {
+ count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size)
+ debug("saved", count, "with error", pe)
+ } else {
+ debug("skipping deleted file ...")
+ nm.Delete(n.Id)
+ }
+ return nil
+ })
+ if err != nil {
+ log.Fatalf("Export Volume File [ERROR] %s\n", err)
+ }
+
+ return true
+}
diff --git a/go/cmd/master.go b/go/cmd/master.go
new file mode 100644
index 000000000..3d8757c16
--- /dev/null
+++ b/go/cmd/master.go
@@ -0,0 +1,217 @@
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "log"
+ "net/http"
+ "code.google.com/p/weed-fs/go/replication"
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/topology"
+ "runtime"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func init() {
+ cmdMaster.Run = runMaster // break init cycle
+ cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
+}
+
+var cmdMaster = &Command{
+ UsageLine: "master -port=9333",
+ Short: "start a master server",
+ Long: `start a master server to provide volume=>location mapping service
+ and sequence number of file ids
+
+ `,
+}
+
+var (
+ mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
+ metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
+ volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
+ mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
+ defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
+ mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
+ mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
+)
+
+var topo *topology.Topology
+var vg *replication.VolumeGrowth
+
+func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
+ vid := r.FormValue("volumeId")
+ commaSep := strings.Index(vid, ",")
+ if commaSep > 0 {
+ vid = vid[0:commaSep]
+ }
+ volumeId, err := storage.NewVolumeId(vid)
+ if err == nil {
+ machines := topo.Lookup(volumeId)
+ if machines != nil {
+ ret := []map[string]string{}
+ for _, dn := range machines {
+ ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
+ }
+ writeJson(w, r, map[string]interface{}{"locations": ret})
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ }
+ } else {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid})
+ }
+}
+
+func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
+ c, e := strconv.Atoi(r.FormValue("count"))
+ if e != nil {
+ c = 1
+ }
+ repType := r.FormValue("replication")
+ if repType == "" {
+ repType = *defaultRepType
+ }
+ rt, err := storage.NewReplicationTypeFromString(repType)
+ if err != nil {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ return
+ }
+ if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
+ if topo.FreeSpace() <= 0 {
+ w.WriteHeader(http.StatusNotFound)
+ writeJson(w, r, map[string]string{"error": "No free volumes left!"})
+ return
+ } else {
+ vg.GrowByType(rt, topo)
+ }
+ }
+ fid, count, dn, err := topo.PickForWrite(rt, c)
+ if err == nil {
+ writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
+ } else {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+}
+
+func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
+ init := r.FormValue("init") == "true"
+ ip := r.FormValue("ip")
+ if ip == "" {
+ ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
+ }
+ port, _ := strconv.Atoi(r.FormValue("port"))
+ maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
+ publicUrl := r.FormValue("publicUrl")
+ volumes := new([]storage.VolumeInfo)
+ json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
+ debug(s, "volumes", r.FormValue("volumes"))
+ topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
+ m := make(map[string]interface{})
+ m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
+ writeJson(w, r, m)
+}
+
+func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Topology"] = topo.ToMap()
+ writeJson(w, r, m)
+}
+
+func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
+ gcThreshold := r.FormValue("garbageThreshold")
+ if gcThreshold == "" {
+ gcThreshold = *garbageThreshold
+ }
+ debug("garbageThreshold =", gcThreshold)
+ topo.Vacuum(gcThreshold)
+ dirStatusHandler(w, r)
+}
+
+func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
+ count := 0
+ rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
+ if err == nil {
+ if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
+ if topo.FreeSpace() < count*rt.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
+ } else {
+ count, err = vg.GrowByCountAndType(count, rt, topo)
+ }
+ } else {
+ err = errors.New("parameter count is not found")
+ }
+ }
+ if err != nil {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()})
+ } else {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]interface{}{"count": count})
+ }
+}
+
+func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Volumes"] = topo.ToVolumeMap()
+ writeJson(w, r, m)
+}
+
+func redirectHandler(w http.ResponseWriter, r *http.Request) {
+ vid, _, _ := parseURLPath(r.URL.Path)
+ volumeId, err := storage.NewVolumeId(vid)
+ if err != nil {
+ debug("parsing error:", err, r.URL.Path)
+ return
+ }
+ machines := topo.Lookup(volumeId)
+ if machines != nil && len(machines) > 0 {
+ http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ }
+}
+
+func runMaster(cmd *Command, args []string) bool {
+ if *mMaxCpu < 1 {
+ *mMaxCpu = runtime.NumCPU()
+ }
+ runtime.GOMAXPROCS(*mMaxCpu)
+ topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
+ vg = replication.NewDefaultVolumeGrowth()
+ log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
+ http.HandleFunc("/dir/assign", dirAssignHandler)
+ http.HandleFunc("/dir/lookup", dirLookupHandler)
+ http.HandleFunc("/dir/join", dirJoinHandler)
+ http.HandleFunc("/dir/status", dirStatusHandler)
+ http.HandleFunc("/vol/grow", volumeGrowHandler)
+ http.HandleFunc("/vol/status", volumeStatusHandler)
+ http.HandleFunc("/vol/vacuum", volumeVacuumHandler)
+
+ http.HandleFunc("/", redirectHandler)
+
+ topo.StartRefreshWritableVolumes(*garbageThreshold)
+
+ log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
+ srv := &http.Server{
+ Addr: ":" + strconv.Itoa(*mport),
+ Handler: http.DefaultServeMux,
+ ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
+ }
+ e := srv.ListenAndServe()
+ if e != nil {
+ log.Fatalf("Fail to start:%s", e.Error())
+ }
+ return true
+}
diff --git a/go/cmd/shell.go b/go/cmd/shell.go
new file mode 100644
index 000000000..daf0b7e1f
--- /dev/null
+++ b/go/cmd/shell.go
@@ -0,0 +1,53 @@
+package main
+
+import (
+ "bufio"
+ "fmt"
+ "os"
+)
+
+func init() {
+ cmdShell.Run = runShell // break init cycle
+}
+
+var cmdShell = &Command{
+ UsageLine: "shell",
+ Short: "run interactive commands, now just echo",
+ Long: `run interactive commands.
+
+ `,
+}
+
+var ()
+
+func runShell(command *Command, args []string) bool {
+ r := bufio.NewReader(os.Stdin)
+ o := bufio.NewWriter(os.Stdout)
+ e := bufio.NewWriter(os.Stderr)
+ prompt := func() {
+ o.WriteString("> ")
+ o.Flush()
+ }
+ readLine := func() string {
+ ret, err := r.ReadString('\n')
+ if err != nil {
+ fmt.Fprint(e, err)
+ os.Exit(1)
+ }
+ return ret
+ }
+ execCmd := func(cmd string) int {
+ if cmd != "" {
+ o.WriteString(cmd)
+ }
+ return 0
+ }
+
+ cmd := ""
+ for {
+ prompt()
+ cmd = readLine()
+ execCmd(cmd)
+ }
+ return true
+}
diff --git a/go/cmd/upload.go b/go/cmd/upload.go
new file mode 100644
index 000000000..0260c9c87
--- /dev/null
+++ b/go/cmd/upload.go
@@ -0,0 +1,113 @@
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "os"
+ "path"
+ "code.google.com/p/weed-fs/go/operation"
+ "code.google.com/p/weed-fs/go/util"
+ "strconv"
+)
+
+var uploadReplication *string
+
+func init() {
+ cmdUpload.Run = runUpload // break init cycle
+ cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
+ server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
+ uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)")
+}
+
+var cmdUpload = &Command{
+ UsageLine: "upload -server=localhost:9333 file1 [file2 file3]",
+ Short: "upload one or a list of files",
+ Long: `upload one or a list of files.
+ It uses consecutive file keys for the list of files.
+ e.g. If the file1 uses key k, file2 can be read via k_1
+
+ `,
+}
+
+type AssignResult struct {
+ Fid string "fid"
+ Url string "url"
+ PublicUrl string "publicUrl"
+ Count int
+ Error string "error"
+}
+
+func assign(count int) (*AssignResult, error) {
+ values := make(url.Values)
+ values.Add("count", strconv.Itoa(count))
+ values.Add("replication", *uploadReplication)
+ jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
+ debug("assign result :", string(jsonBlob))
+ if err != nil {
+ return nil, err
+ }
+ var ret AssignResult
+ err = json.Unmarshal(jsonBlob, &ret)
+ if err != nil {
+ return nil, err
+ }
+ if ret.Count <= 0 {
+ return nil, errors.New(ret.Error)
+ }
+ return &ret, nil
+}
+
+func upload(filename string, server string, fid string) (int, error) {
+ debug("Start uploading file:", filename)
+ fh, err := os.Open(filename)
+ if err != nil {
+ debug("Failed to open file:", filename)
+ return 0, err
+ }
+ ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh)
+ if e != nil {
+ return 0, e
+ }
+ return ret.Size, e
+}
+
+type SubmitResult struct {
+ Fid string "fid"
+ Size int "size"
+ Error string "error"
+}
+
+func submit(files []string) []SubmitResult {
+ ret, err := assign(len(files))
+ if err != nil {
+ fmt.Println(err)
+ return nil
+ }
+ results := make([]SubmitResult, len(files))
+ for index, file := range files {
+ fid := ret.Fid
+ if index > 0 {
+ fid = fid + "_" + strconv.Itoa(index)
+ }
+ results[index].Size, err = upload(file, ret.PublicUrl, fid)
+ if err != nil {
+ fid = ""
+ results[index].Error = err.Error()
+ }
+ results[index].Fid = fid
+ }
+ return results
+}
+
+func runUpload(cmd *Command, args []string) bool {
+ *IsDebug = true
+ if len(cmdUpload.Flag.Args()) == 0 {
+ return false
+ }
+ results := submit(args)
+ bytes, _ := json.Marshal(results)
+ fmt.Print(string(bytes))
+ return true
+}
diff --git a/go/cmd/version.go b/go/cmd/version.go
new file mode 100644
index 000000000..b418126a4
--- /dev/null
+++ b/go/cmd/version.go
@@ -0,0 +1,26 @@
+package main
+
+import (
+ "fmt"
+ "runtime"
+)
+
+const (
+ VERSION = "0.28 beta"
+)
+
+var cmdVersion = &Command{
+ Run: runVersion,
+ UsageLine: "version",
+ Short: "print Weed File System version",
+ Long: `Version prints the Weed File System version`,
+}
+
+func runVersion(cmd *Command, args []string) bool {
+ if len(args) != 0 {
+ cmd.Usage()
+ }
+
+ fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
+ return true
+}
diff --git a/go/cmd/volume.go b/go/cmd/volume.go
new file mode 100644
index 000000000..bf906681e
--- /dev/null
+++ b/go/cmd/volume.go
@@ -0,0 +1,378 @@
+package main
+
+import (
+ "bytes"
+ "log"
+ "math/rand"
+ "mime"
+ "net/http"
+ "os"
+ "code.google.com/p/weed-fs/go/operation"
+ "code.google.com/p/weed-fs/go/storage"
+ "runtime"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func init() {
+ cmdVolume.Run = runVolume // break init cycle
+ cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
+}
+
+var cmdVolume = &Command{
+ UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333",
+ Short: "start a volume server",
+ Long: `start a volume server to provide storage spaces
+
+ `,
+}
+
+var (
+ vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
+ volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
+ ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
+ publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
+ masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
+ vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
+ maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
+ vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
+ vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+
+ store *storage.Store
+)
+
+var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
+
+func statusHandler(w http.ResponseWriter, r *http.Request) {
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Volumes"] = store.Status()
+ writeJson(w, r, m)
+}
+func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
+ err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
+ if err == nil {
+ writeJson(w, r, map[string]string{"error": ""})
+ } else {
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+ debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
+}
+func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
+ err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
+ if err == nil {
+ writeJson(w, r, map[string]interface{}{"error": "", "result": ret})
+ } else {
+ writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false})
+ }
+ debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
+}
+func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
+ err := store.CompactVolume(r.FormValue("volume"))
+ if err == nil {
+ writeJson(w, r, map[string]string{"error": ""})
+ } else {
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+ debug("compacted volume =", r.FormValue("volume"), ", error =", err)
+}
+func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
+ err := store.CommitCompactVolume(r.FormValue("volume"))
+ if err == nil {
+ writeJson(w, r, map[string]interface{}{"error": ""})
+ } else {
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+ debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
+}
+func storeHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "GET":
+ GetHandler(w, r)
+ case "DELETE":
+ DeleteHandler(w, r)
+ case "POST":
+ PostHandler(w, r)
+ }
+}
+func GetHandler(w http.ResponseWriter, r *http.Request) {
+ n := new(storage.Needle)
+ vid, fid, ext := parseURLPath(r.URL.Path)
+ volumeId, err := storage.NewVolumeId(vid)
+ if err != nil {
+ debug("parsing error:", err, r.URL.Path)
+ return
+ }
+ n.ParsePath(fid)
+
+ debug("volume", volumeId, "reading", n)
+ if !store.HasVolume(volumeId) {
+ lookupResult, err := operation.Lookup(*masterNode, volumeId)
+ debug("volume", volumeId, "found on", lookupResult, "error", err)
+ if err == nil {
+ http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
+ } else {
+ debug("lookup error:", err, r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ }
+ return
+ }
+ cookie := n.Cookie
+ count, e := store.Read(volumeId, n)
+ debug("read bytes", count, "error", e)
+ if e != nil || count <= 0 {
+ debug("read error:", e, r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ if n.Cookie != cookie {
+ log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ if n.NameSize > 0 {
+ fname := string(n.Name)
+ dotIndex := strings.LastIndex(fname, ".")
+ if dotIndex > 0 {
+ ext = fname[dotIndex:]
+ }
+ }
+ mtype := ""
+ if ext != "" {
+ mtype = mime.TypeByExtension(ext)
+ }
+ if n.MimeSize > 0 {
+ mtype = string(n.Mime)
+ }
+ if mtype != "" {
+ w.Header().Set("Content-Type", mtype)
+ }
+ if n.NameSize > 0 {
+ w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name)))
+ }
+ if ext != ".gz" {
+ if n.IsGzipped() {
+ if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
+ w.Header().Set("Content-Encoding", "gzip")
+ } else {
+ if n.Data, err = storage.UnGzipData(n.Data); err != nil {
+ debug("lookup error:", err, r.URL.Path)
+ }
+ }
+ }
+ }
+ w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
+ w.Write(n.Data)
+}
+func PostHandler(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ vid, _, _ := parseURLPath(r.URL.Path)
+ volumeId, e := storage.NewVolumeId(vid)
+ if e != nil {
+ writeJson(w, r, e)
+ } else {
+ needle, filename, ne := storage.NewNeedle(r)
+ if ne != nil {
+ writeJson(w, r, ne)
+ } else {
+ ret, err := store.Write(volumeId, needle)
+ errorStatus := ""
+ needToReplicate := !store.HasVolume(volumeId)
+ if err != nil {
+ errorStatus = "Failed to write to local disk (" + err.Error() + ")"
+ } else if ret > 0 {
+ needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate()
+ } else {
+ errorStatus = "Failed to write to local disk"
+ }
+ if !needToReplicate && ret > 0 {
+ needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "standard" {
+ if !distributedOperation(volumeId, func(location operation.Location) bool {
+ _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
+ return err == nil
+ }) {
+ ret = 0
+ errorStatus = "Failed to write to replicas for volume " + volumeId.String()
+ }
+ }
+ }
+ m := make(map[string]interface{})
+ if errorStatus == "" {
+ w.WriteHeader(http.StatusCreated)
+ } else {
+ store.Delete(volumeId, needle)
+ distributedOperation(volumeId, func(location operation.Location) bool {
+ return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
+ })
+ w.WriteHeader(http.StatusInternalServerError)
+ m["error"] = errorStatus
+ }
+ m["size"] = ret
+ writeJson(w, r, m)
+ }
+ }
+}
+func DeleteHandler(w http.ResponseWriter, r *http.Request) {
+ n := new(storage.Needle)
+ vid, fid, _ := parseURLPath(r.URL.Path)
+ volumeId, _ := storage.NewVolumeId(vid)
+ n.ParsePath(fid)
+
+ debug("deleting", n)
+
+ cookie := n.Cookie
+ count, ok := store.Read(volumeId, n)
+
+ if ok != nil {
+ m := make(map[string]uint32)
+ m["size"] = 0
+ writeJson(w, r, m)
+ return
+ }
+
+ if n.Cookie != cookie {
+ log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
+ return
+ }
+
+ n.Size = 0
+ ret, err := store.Delete(volumeId, n)
+ if err != nil {
+ log.Printf("delete error: %s\n", err)
+ return
+ }
+
+ needToReplicate := !store.HasVolume(volumeId)
+ if !needToReplicate && ret > 0 {
+ needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "standard" {
+ if !distributedOperation(volumeId, func(location operation.Location) bool {
+ return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
+ }) {
+ ret = 0
+ }
+ }
+ }
+
+ if ret != 0 {
+ w.WriteHeader(http.StatusAccepted)
+ } else {
+ w.WriteHeader(http.StatusInternalServerError)
+ }
+
+ m := make(map[string]uint32)
+ m["size"] = uint32(count)
+ writeJson(w, r, m)
+}
+
+func parseURLPath(path string) (vid, fid, ext string) {
+
+ sepIndex := strings.LastIndex(path, "/")
+ commaIndex := strings.LastIndex(path[sepIndex:], ",")
+ if commaIndex <= 0 {
+ if "favicon.ico" != path[sepIndex+1:] {
+ log.Println("unknown file id", path[sepIndex+1:])
+ }
+ return
+ }
+ dotIndex := strings.LastIndex(path[sepIndex:], ".")
+ vid = path[sepIndex+1 : commaIndex]
+ fid = path[commaIndex+1:]
+ ext = ""
+ if dotIndex > 0 {
+ fid = path[commaIndex+1 : dotIndex]
+ ext = path[dotIndex:]
+ }
+ return
+}
+
+func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
+ if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
+ length := 0
+ selfUrl := (*ip + ":" + strconv.Itoa(*vport))
+ results := make(chan bool)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ length++
+ go func(location operation.Location, results chan bool) {
+ results <- op(location)
+ }(location, results)
+ }
+ }
+ ret := true
+ for i := 0; i < length; i++ {
+ ret = ret && <-results
+ }
+ return ret
+ } else {
+ log.Println("Failed to lookup for", volumeId, lookupErr.Error())
+ }
+ return false
+}
+
+func runVolume(cmd *Command, args []string) bool {
+ if *vMaxCpu < 1 {
+ *vMaxCpu = runtime.NumCPU()
+ }
+ runtime.GOMAXPROCS(*vMaxCpu)
+ fileInfo, err := os.Stat(*volumeFolder)
+ if err != nil {
+ log.Fatalf("No Existing Folder:%s", *volumeFolder)
+ }
+ if !fileInfo.IsDir() {
+ log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
+ }
+ perm := fileInfo.Mode().Perm()
+ log.Println("Volume Folder permission:", perm)
+
+ if *publicUrl == "" {
+ *publicUrl = *ip + ":" + strconv.Itoa(*vport)
+ }
+
+ store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
+ defer store.Close()
+ http.HandleFunc("/", storeHandler)
+ http.HandleFunc("/status", statusHandler)
+ http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
+ http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
+ http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
+ http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
+
+ go func() {
+ connected := true
+ store.SetMaster(*masterNode)
+ for {
+ err := store.Join()
+ if err == nil {
+ if !connected {
+ connected = true
+ log.Println("Reconnected with master")
+ }
+ } else {
+ if connected {
+ connected = false
+ }
+ }
+ time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+ log.Println("store joined at", *masterNode)
+
+ log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
+ srv := &http.Server{
+ Addr: ":" + strconv.Itoa(*vport),
+ Handler: http.DefaultServeMux,
+ ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
+ }
+ e := srv.ListenAndServe()
+ if e != nil {
+ log.Fatalf("Fail to start:%s", e.Error())
+ }
+ return true
+}
diff --git a/go/cmd/weed.go b/go/cmd/weed.go
new file mode 100644
index 000000000..c03cb68ac
--- /dev/null
+++ b/go/cmd/weed.go
@@ -0,0 +1,199 @@
+package main
+
+import (
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "math/rand"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+ "text/template"
+ "time"
+ "unicode"
+ "unicode/utf8"
+)
+
+var IsDebug *bool
+var server *string
+
+var commands = []*Command{
+ cmdFix,
+ cmdMaster,
+ cmdUpload,
+ cmdShell,
+ cmdVersion,
+ cmdVolume,
+ cmdExport,
+}
+
+var exitStatus = 0
+var exitMu sync.Mutex
+
+func setExitStatus(n int) {
+ exitMu.Lock()
+ if exitStatus < n {
+ exitStatus = n
+ }
+ exitMu.Unlock()
+}
+
+func main() {
+ rand.Seed(time.Now().UnixNano())
+ flag.Usage = usage
+ flag.Parse()
+
+ args := flag.Args()
+ if len(args) < 1 {
+ usage()
+ }
+
+ if args[0] == "help" {
+ help(args[1:])
+ for _, cmd := range commands {
+ if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil {
+ fmt.Fprintf(os.Stderr, "Default Parameters:\n")
+ cmd.Flag.PrintDefaults()
+ }
+ }
+ return
+ }
+
+ for _, cmd := range commands {
+ if cmd.Name() == args[0] && cmd.Run != nil {
+ cmd.Flag.Usage = func() { cmd.Usage() }
+ cmd.Flag.Parse(args[1:])
+ args = cmd.Flag.Args()
+ IsDebug = cmd.IsDebug
+ if !cmd.Run(cmd, args) {
+ fmt.Fprintf(os.Stderr, "\n")
+ cmd.Flag.Usage()
+ fmt.Fprintf(os.Stderr, "Default Parameters:\n")
+ cmd.Flag.PrintDefaults()
+ }
+ exit()
+ return
+ }
+ }
+
+ fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0])
+ setExitStatus(2)
+ exit()
+}
+
+var usageTemplate = `WeedFS is a software to store billions of files and serve them fast!
+
+Usage:
+
+ weed command [arguments]
+
+The commands are:
+{{range .}}{{if .Runnable}}
+ {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}}
+
+Use "weed help [command]" for more information about a command.
+
+`
+
+var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}}
+{{end}}
+ {{.Long}}
+`
+
+// tmpl executes the given template text on data, writing the result to w.
+func tmpl(w io.Writer, text string, data interface{}) {
+ t := template.New("top")
+ t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize})
+ template.Must(t.Parse(text))
+ if err := t.Execute(w, data); err != nil {
+ panic(err)
+ }
+}
+
+func capitalize(s string) string {
+ if s == "" {
+ return s
+ }
+ r, n := utf8.DecodeRuneInString(s)
+ return string(unicode.ToTitle(r)) + s[n:]
+}
+
+func printUsage(w io.Writer) {
+ tmpl(w, usageTemplate, commands)
+}
+
+func usage() {
+ printUsage(os.Stderr)
+ os.Exit(2)
+}
+
+// help implements the 'help' command.
+func help(args []string) {
+ if len(args) == 0 {
+ printUsage(os.Stdout)
+ // not exit 2: succeeded at 'weed help'.
+ return
+ }
+ if len(args) != 1 {
+ fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n")
+ os.Exit(2) // failed at 'weed help'
+ }
+
+ arg := args[0]
+
+ for _, cmd := range commands {
+ if cmd.Name() == arg {
+ tmpl(os.Stdout, helpTemplate, cmd)
+ // not exit 2: succeeded at 'weed help cmd'.
+ return
+ }
+ }
+
+ fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg)
+ os.Exit(2) // failed at 'weed help cmd'
+}
+
+var atexitFuncs []func()
+
+func atexit(f func()) {
+ atexitFuncs = append(atexitFuncs, f)
+}
+
+func exit() {
+ for _, f := range atexitFuncs {
+ f()
+ }
+ os.Exit(exitStatus)
+}
+
+func exitIfErrors() {
+ if exitStatus != 0 {
+ exit()
+ }
+}
+func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
+ w.Header().Set("Content-Type", "application/javascript")
+ var bytes []byte
+ if r.FormValue("pretty") != "" {
+ bytes, _ = json.MarshalIndent(obj, "", " ")
+ } else {
+ bytes, _ = json.Marshal(obj)
+ }
+ callback := r.FormValue("callback")
+ if callback == "" {
+ w.Write(bytes)
+ } else {
+ w.Write([]uint8(callback))
+ w.Write([]uint8("("))
+ fmt.Fprint(w, string(bytes))
+ w.Write([]uint8(")"))
+ }
+}
+
+func debug(params ...interface{}) {
+ if *IsDebug {
+ fmt.Println(params)
+ }
+}