aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/command.go54
-rw-r--r--src/export.go164
-rw-r--r--src/fix.go64
-rw-r--r--src/master.go217
-rw-r--r--src/shell.go53
-rw-r--r--src/upload.go113
-rw-r--r--src/version.go26
-rw-r--r--src/volume.go378
-rw-r--r--src/weed.go199
9 files changed, 0 insertions, 1268 deletions
diff --git a/src/command.go b/src/command.go
deleted file mode 100644
index c8d86ca66..000000000
--- a/src/command.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package main
-
-import (
- "flag"
- "fmt"
- "os"
- "strings"
-)
-
-type Command struct {
- // Run runs the command.
- // The args are the arguments after the command name.
- Run func(cmd *Command, args []string) bool
-
- // UsageLine is the one-line usage message.
- // The first word in the line is taken to be the command name.
- UsageLine string
-
- // Short is the short description shown in the 'go help' output.
- Short string
-
- // Long is the long message shown in the 'go help <this-command>' output.
- Long string
-
- // Flag is a set of flags specific to this command.
- Flag flag.FlagSet
-
- IsDebug *bool
-}
-
-// Name returns the command's name: the first word in the usage line.
-func (c *Command) Name() string {
- name := c.UsageLine
- i := strings.Index(name, " ")
- if i >= 0 {
- name = name[:i]
- }
- return name
-}
-
-func (c *Command) Usage() {
- fmt.Fprintf(os.Stderr, "Example: weed %s\n", c.UsageLine)
- fmt.Fprintf(os.Stderr, "Default Usage:\n")
- c.Flag.PrintDefaults()
- fmt.Fprintf(os.Stderr, "Description:\n")
- fmt.Fprintf(os.Stderr, " %s\n", strings.TrimSpace(c.Long))
- os.Exit(2)
-}
-
-// Runnable reports whether the command can be run; otherwise
-// it is a documentation pseudo-command such as importpath.
-func (c *Command) Runnable() bool {
- return c.Run != nil
-}
diff --git a/src/export.go b/src/export.go
deleted file mode 100644
index 44cf85c24..000000000
--- a/src/export.go
+++ /dev/null
@@ -1,164 +0,0 @@
-package main
-
-import (
- "archive/tar"
- "bytes"
- "fmt"
- "log"
- "os"
- "path"
- "code.google.com/p/weed-fs/weed/directory"
- "code.google.com/p/weed-fs/weed/storage"
- "strconv"
- "strings"
- "text/template"
- "time"
-)
-
-func init() {
- cmdExport.Run = runExport // break init cycle
- cmdExport.IsDebug = cmdExport.Flag.Bool("debug", false, "enable debug mode")
-}
-
-const (
- defaultFnFormat = `{{.Mime}}/{{.Id}}:{{.Name}}`
-)
-
-var cmdExport = &Command{
- UsageLine: "export -dir=/tmp -volumeId=234 -o=/dir/name.tar -fileNameFormat={{.Name}}",
- Short: "list or export files from one volume data file",
- Long: `List all files in a volume, or Export all files in a volume to a tar file if the output is specified.
-
- The format of file name in the tar file can be customized. Default is {{.Mime}}/{{.Id}}:{{.Name}}. Also available is {{Key}}.
-
- `,
-}
-
-var (
- exportVolumePath = cmdExport.Flag.String("dir", "/tmp", "input data directory to store volume data files")
- exportVolumeId = cmdExport.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
- dest = cmdExport.Flag.String("o", "", "output tar file name, must ends with .tar, or just a \"-\" for stdout")
- format = cmdExport.Flag.String("fileNameFormat", defaultFnFormat, "filename format, default to {{.Mime}}/{{.Id}}:{{.Name}}")
- tarFh *tar.Writer
- tarHeader tar.Header
- fnTmpl *template.Template
- fnTmplBuf = bytes.NewBuffer(nil)
-)
-
-func runExport(cmd *Command, args []string) bool {
-
- if *exportVolumeId == -1 {
- return false
- }
-
- var err error
- if *dest != "" {
- if *dest != "-" && !strings.HasSuffix(*dest, ".tar") {
- fmt.Println("the output file", *dest, "should be '-' or end with .tar")
- return false
- }
-
- if fnTmpl, err = template.New("name").Parse(*format); err != nil {
- fmt.Println("cannot parse format " + *format + ": " + err.Error())
- return false
- }
-
- var fh *os.File
- if *dest == "-" {
- fh = os.Stdout
- } else {
- if fh, err = os.Create(*dest); err != nil {
- log.Fatalf("cannot open output tar %s: %s", *dest, err)
- }
- }
- defer fh.Close()
- tarFh = tar.NewWriter(fh)
- defer tarFh.Close()
- t := time.Now()
- tarHeader = tar.Header{Mode: 0644,
- ModTime: t, Uid: os.Getuid(), Gid: os.Getgid(),
- Typeflag: tar.TypeReg,
- AccessTime: t, ChangeTime: t}
- }
-
- fileName := strconv.Itoa(*exportVolumeId)
- vid := storage.VolumeId(*exportVolumeId)
- indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
- if err != nil {
- log.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
-
- nm := storage.LoadNeedleMap(indexFile)
-
- var version storage.Version
-
- err = storage.ScanVolumeFile(*exportVolumePath, vid, func(superBlock storage.SuperBlock) error {
- version = superBlock.Version
- return nil
- }, func(n *storage.Needle, offset uint32) error {
- debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
- nv, ok := nm.Get(n.Id)
- if ok && nv.Size > 0 {
- return walker(vid, n, version)
- } else {
- if !ok {
- debug("This seems deleted", n.Id)
- } else {
- debug("Id", n.Id, "size", n.Size)
- }
- }
- return nil
- })
- if err != nil {
- log.Fatalf("Export Volume File [ERROR] %s\n", err)
- }
- return true
-}
-
-type nameParams struct {
- Name string
- Id uint64
- Mime string
- Key string
-}
-
-func walker(vid storage.VolumeId, n *storage.Needle, version storage.Version) (err error) {
- key := directory.NewFileId(vid, n.Id, n.Cookie).String()
- if tarFh != nil {
- fnTmplBuf.Reset()
- if err = fnTmpl.Execute(fnTmplBuf,
- nameParams{Name: string(n.Name),
- Id: n.Id,
- Mime: string(n.Mime),
- Key: key,
- },
- ); err != nil {
- return err
- }
- nm := fnTmplBuf.String()
-
- if n.IsGzipped() && path.Ext(nm) != ".gz" {
- nm = nm + ".gz"
- }
-
- tarHeader.Name, tarHeader.Size = nm, int64(len(n.Data))
- if err = tarFh.WriteHeader(&tarHeader); err != nil {
- return err
- }
- _, err = tarFh.Write(n.Data)
- } else {
- size := n.DataSize
- if version == storage.Version1 {
- size = n.Size
- }
- fmt.Printf("key=%s Name=%s Size=%d gzip=%t mime=%s\n",
- key,
- n.Name,
- size,
- n.IsGzipped(),
- n.Mime,
- )
- }
- return
-}
diff --git a/src/fix.go b/src/fix.go
deleted file mode 100644
index 85693d9b1..000000000
--- a/src/fix.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package main
-
-import (
- "log"
- "os"
- "path"
- "code.google.com/p/weed-fs/weed/storage"
- "strconv"
-)
-
-func init() {
- cmdFix.Run = runFix // break init cycle
- cmdFix.IsDebug = cmdFix.Flag.Bool("debug", false, "enable debug mode")
-}
-
-var cmdFix = &Command{
- UsageLine: "fix -dir=/tmp -volumeId=234",
- Short: "run weed tool fix on index file if corrupted",
- Long: `Fix runs the WeedFS fix command to re-create the index .idx file.
-
- `,
-}
-
-var (
- fixVolumePath = cmdFix.Flag.String("dir", "/tmp", "data directory to store files")
- fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
-)
-
-func runFix(cmd *Command, args []string) bool {
-
- if *fixVolumeId == -1 {
- return false
- }
-
- fileName := strconv.Itoa(*fixVolumeId)
- indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_WRONLY|os.O_CREATE, 0644)
- if err != nil {
- log.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
-
- nm := storage.NewNeedleMap(indexFile)
- defer nm.Close()
-
- vid := storage.VolumeId(*fixVolumeId)
- err = storage.ScanVolumeFile(*fixVolumePath, vid, func(superBlock storage.SuperBlock) error {
- return nil
- }, func(n *storage.Needle, offset uint32) error {
- debug("key", n.Id, "offset", offset, "size", n.Size, "disk_size", n.DiskSize(), "gzip", n.IsGzipped())
- if n.Size > 0 {
- count, pe := nm.Put(n.Id, offset/storage.NeedlePaddingSize, n.Size)
- debug("saved", count, "with error", pe)
- } else {
- debug("skipping deleted file ...")
- nm.Delete(n.Id)
- }
- return nil
- })
- if err != nil {
- log.Fatalf("Export Volume File [ERROR] %s\n", err)
- }
-
- return true
-}
diff --git a/src/master.go b/src/master.go
deleted file mode 100644
index 78db929f2..000000000
--- a/src/master.go
+++ /dev/null
@@ -1,217 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "errors"
- "log"
- "net/http"
- "code.google.com/p/weed-fs/weed/replication"
- "code.google.com/p/weed-fs/weed/storage"
- "code.google.com/p/weed-fs/weed/topology"
- "runtime"
- "strconv"
- "strings"
- "time"
-)
-
-func init() {
- cmdMaster.Run = runMaster // break init cycle
- cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
-}
-
-var cmdMaster = &Command{
- UsageLine: "master -port=9333",
- Short: "start a master server",
- Long: `start a master server to provide volume=>location mapping service
- and sequence number of file ids
-
- `,
-}
-
-var (
- mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
- metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
- volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
- mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
- defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
- mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
- mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
-)
-
-var topo *topology.Topology
-var vg *replication.VolumeGrowth
-
-func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
- vid := r.FormValue("volumeId")
- commaSep := strings.Index(vid, ",")
- if commaSep > 0 {
- vid = vid[0:commaSep]
- }
- volumeId, err := storage.NewVolumeId(vid)
- if err == nil {
- machines := topo.Lookup(volumeId)
- if machines != nil {
- ret := []map[string]string{}
- for _, dn := range machines {
- ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
- }
- writeJson(w, r, map[string]interface{}{"locations": ret})
- } else {
- w.WriteHeader(http.StatusNotFound)
- writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
- }
- } else {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid})
- }
-}
-
-func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
- c, e := strconv.Atoi(r.FormValue("count"))
- if e != nil {
- c = 1
- }
- repType := r.FormValue("replication")
- if repType == "" {
- repType = *defaultRepType
- }
- rt, err := storage.NewReplicationTypeFromString(repType)
- if err != nil {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": err.Error()})
- return
- }
- if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
- if topo.FreeSpace() <= 0 {
- w.WriteHeader(http.StatusNotFound)
- writeJson(w, r, map[string]string{"error": "No free volumes left!"})
- return
- } else {
- vg.GrowByType(rt, topo)
- }
- }
- fid, count, dn, err := topo.PickForWrite(rt, c)
- if err == nil {
- writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
- } else {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": err.Error()})
- }
-}
-
-func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
- init := r.FormValue("init") == "true"
- ip := r.FormValue("ip")
- if ip == "" {
- ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
- }
- port, _ := strconv.Atoi(r.FormValue("port"))
- maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
- s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
- publicUrl := r.FormValue("publicUrl")
- volumes := new([]storage.VolumeInfo)
- json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
- debug(s, "volumes", r.FormValue("volumes"))
- topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
- m := make(map[string]interface{})
- m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
- writeJson(w, r, m)
-}
-
-func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
- m := make(map[string]interface{})
- m["Version"] = VERSION
- m["Topology"] = topo.ToMap()
- writeJson(w, r, m)
-}
-
-func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
- gcThreshold := r.FormValue("garbageThreshold")
- if gcThreshold == "" {
- gcThreshold = *garbageThreshold
- }
- debug("garbageThreshold =", gcThreshold)
- topo.Vacuum(gcThreshold)
- dirStatusHandler(w, r)
-}
-
-func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
- count := 0
- rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
- if err == nil {
- if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if topo.FreeSpace() < count*rt.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
- } else {
- count, err = vg.GrowByCountAndType(count, rt, topo)
- }
- } else {
- err = errors.New("parameter count is not found")
- }
- }
- if err != nil {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()})
- } else {
- w.WriteHeader(http.StatusNotAcceptable)
- writeJson(w, r, map[string]interface{}{"count": count})
- }
-}
-
-func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
- m := make(map[string]interface{})
- m["Version"] = VERSION
- m["Volumes"] = topo.ToVolumeMap()
- writeJson(w, r, m)
-}
-
-func redirectHandler(w http.ResponseWriter, r *http.Request) {
- vid, _, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
- if err != nil {
- debug("parsing error:", err, r.URL.Path)
- return
- }
- machines := topo.Lookup(volumeId)
- if machines != nil && len(machines) > 0 {
- http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
- } else {
- w.WriteHeader(http.StatusNotFound)
- writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
- }
-}
-
-func runMaster(cmd *Command, args []string) bool {
- if *mMaxCpu < 1 {
- *mMaxCpu = runtime.NumCPU()
- }
- runtime.GOMAXPROCS(*mMaxCpu)
- topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
- vg = replication.NewDefaultVolumeGrowth()
- log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
- http.HandleFunc("/dir/assign", dirAssignHandler)
- http.HandleFunc("/dir/lookup", dirLookupHandler)
- http.HandleFunc("/dir/join", dirJoinHandler)
- http.HandleFunc("/dir/status", dirStatusHandler)
- http.HandleFunc("/vol/grow", volumeGrowHandler)
- http.HandleFunc("/vol/status", volumeStatusHandler)
- http.HandleFunc("/vol/vacuum", volumeVacuumHandler)
-
- http.HandleFunc("/", redirectHandler)
-
- topo.StartRefreshWritableVolumes(*garbageThreshold)
-
- log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
- srv := &http.Server{
- Addr: ":" + strconv.Itoa(*mport),
- Handler: http.DefaultServeMux,
- ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
- }
- e := srv.ListenAndServe()
- if e != nil {
- log.Fatalf("Fail to start:%s", e.Error())
- }
- return true
-}
diff --git a/src/shell.go b/src/shell.go
deleted file mode 100644
index daf0b7e1f..000000000
--- a/src/shell.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package main
-
-import (
- "bufio"
- "fmt"
- "os"
-)
-
-func init() {
- cmdShell.Run = runShell // break init cycle
-}
-
-var cmdShell = &Command{
- UsageLine: "shell",
- Short: "run interactive commands, now just echo",
- Long: `run interactive commands.
-
- `,
-}
-
-var ()
-
-func runShell(command *Command, args []string) bool {
- r := bufio.NewReader(os.Stdin)
- o := bufio.NewWriter(os.Stdout)
- e := bufio.NewWriter(os.Stderr)
- prompt := func() {
- o.WriteString("> ")
- o.Flush()
- }
- readLine := func() string {
- ret, err := r.ReadString('\n')
- if err != nil {
- fmt.Fprint(e, err)
- os.Exit(1)
- }
- return ret
- }
- execCmd := func(cmd string) int {
- if cmd != "" {
- o.WriteString(cmd)
- }
- return 0
- }
-
- cmd := ""
- for {
- prompt()
- cmd = readLine()
- execCmd(cmd)
- }
- return true
-}
diff --git a/src/upload.go b/src/upload.go
deleted file mode 100644
index e1e296bf2..000000000
--- a/src/upload.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
- "os"
- "path"
- "code.google.com/p/weed-fs/weed/operation"
- "code.google.com/p/weed-fs/weed/util"
- "strconv"
-)
-
-var uploadReplication *string
-
-func init() {
- cmdUpload.Run = runUpload // break init cycle
- cmdUpload.IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
- server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
- uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)")
-}
-
-var cmdUpload = &Command{
- UsageLine: "upload -server=localhost:9333 file1 [file2 file3]",
- Short: "upload one or a list of files",
- Long: `upload one or a list of files.
- It uses consecutive file keys for the list of files.
- e.g. If the file1 uses key k, file2 can be read via k_1
-
- `,
-}
-
-type AssignResult struct {
- Fid string "fid"
- Url string "url"
- PublicUrl string "publicUrl"
- Count int
- Error string "error"
-}
-
-func assign(count int) (*AssignResult, error) {
- values := make(url.Values)
- values.Add("count", strconv.Itoa(count))
- values.Add("replication", *uploadReplication)
- jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
- debug("assign result :", string(jsonBlob))
- if err != nil {
- return nil, err
- }
- var ret AssignResult
- err = json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return nil, err
- }
- if ret.Count <= 0 {
- return nil, errors.New(ret.Error)
- }
- return &ret, nil
-}
-
-func upload(filename string, server string, fid string) (int, error) {
- debug("Start uploading file:", filename)
- fh, err := os.Open(filename)
- if err != nil {
- debug("Failed to open file:", filename)
- return 0, err
- }
- ret, e := operation.Upload("http://"+server+"/"+fid, path.Base(filename), fh)
- if e != nil {
- return 0, e
- }
- return ret.Size, e
-}
-
-type SubmitResult struct {
- Fid string "fid"
- Size int "size"
- Error string "error"
-}
-
-func submit(files []string) []SubmitResult {
- ret, err := assign(len(files))
- if err != nil {
- fmt.Println(err)
- return nil
- }
- results := make([]SubmitResult, len(files))
- for index, file := range files {
- fid := ret.Fid
- if index > 0 {
- fid = fid + "_" + strconv.Itoa(index)
- }
- results[index].Size, err = upload(file, ret.PublicUrl, fid)
- if err != nil {
- fid = ""
- results[index].Error = err.Error()
- }
- results[index].Fid = fid
- }
- return results
-}
-
-func runUpload(cmd *Command, args []string) bool {
- *IsDebug = true
- if len(cmdUpload.Flag.Args()) == 0 {
- return false
- }
- results := submit(args)
- bytes, _ := json.Marshal(results)
- fmt.Print(string(bytes))
- return true
-}
diff --git a/src/version.go b/src/version.go
deleted file mode 100644
index b418126a4..000000000
--- a/src/version.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package main
-
-import (
- "fmt"
- "runtime"
-)
-
-const (
- VERSION = "0.28 beta"
-)
-
-var cmdVersion = &Command{
- Run: runVersion,
- UsageLine: "version",
- Short: "print Weed File System version",
- Long: `Version prints the Weed File System version`,
-}
-
-func runVersion(cmd *Command, args []string) bool {
- if len(args) != 0 {
- cmd.Usage()
- }
-
- fmt.Printf("version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH)
- return true
-}
diff --git a/src/volume.go b/src/volume.go
deleted file mode 100644
index 8bfc7681a..000000000
--- a/src/volume.go
+++ /dev/null
@@ -1,378 +0,0 @@
-package main
-
-import (
- "bytes"
- "log"
- "math/rand"
- "mime"
- "net/http"
- "os"
- "code.google.com/p/weed-fs/weed/operation"
- "code.google.com/p/weed-fs/weed/storage"
- "runtime"
- "strconv"
- "strings"
- "time"
-)
-
-func init() {
- cmdVolume.Run = runVolume // break init cycle
- cmdVolume.IsDebug = cmdVolume.Flag.Bool("debug", false, "enable debug mode")
-}
-
-var cmdVolume = &Command{
- UsageLine: "volume -port=8080 -dir=/tmp -max=5 -ip=server_name -mserver=localhost:9333",
- Short: "start a volume server",
- Long: `start a volume server to provide storage spaces
-
- `,
-}
-
-var (
- vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
- volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "directory to store data files")
- ip = cmdVolume.Flag.String("ip", "localhost", "ip or server name")
- publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
- masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master server location")
- vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
- maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
- vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
- vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
-
- store *storage.Store
-)
-
-var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
-
-func statusHandler(w http.ResponseWriter, r *http.Request) {
- m := make(map[string]interface{})
- m["Version"] = VERSION
- m["Volumes"] = store.Status()
- writeJson(w, r, m)
-}
-func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
- err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
- if err == nil {
- writeJson(w, r, map[string]string{"error": ""})
- } else {
- writeJson(w, r, map[string]string{"error": err.Error()})
- }
- debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
-}
-func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) {
- err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold"))
- if err == nil {
- writeJson(w, r, map[string]interface{}{"error": "", "result": ret})
- } else {
- writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false})
- }
- debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret)
-}
-func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) {
- err := store.CompactVolume(r.FormValue("volume"))
- if err == nil {
- writeJson(w, r, map[string]string{"error": ""})
- } else {
- writeJson(w, r, map[string]string{"error": err.Error()})
- }
- debug("compacted volume =", r.FormValue("volume"), ", error =", err)
-}
-func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
- err := store.CommitCompactVolume(r.FormValue("volume"))
- if err == nil {
- writeJson(w, r, map[string]interface{}{"error": ""})
- } else {
- writeJson(w, r, map[string]string{"error": err.Error()})
- }
- debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
-}
-func storeHandler(w http.ResponseWriter, r *http.Request) {
- switch r.Method {
- case "GET":
- GetHandler(w, r)
- case "DELETE":
- DeleteHandler(w, r)
- case "POST":
- PostHandler(w, r)
- }
-}
-func GetHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
- vid, fid, ext := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
- if err != nil {
- debug("parsing error:", err, r.URL.Path)
- return
- }
- n.ParsePath(fid)
-
- debug("volume", volumeId, "reading", n)
- if !store.HasVolume(volumeId) {
- lookupResult, err := operation.Lookup(*masterNode, volumeId)
- debug("volume", volumeId, "found on", lookupResult, "error", err)
- if err == nil {
- http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
- } else {
- debug("lookup error:", err, r.URL.Path)
- w.WriteHeader(http.StatusNotFound)
- }
- return
- }
- cookie := n.Cookie
- count, e := store.Read(volumeId, n)
- debug("read bytes", count, "error", e)
- if e != nil || count <= 0 {
- debug("read error:", e, r.URL.Path)
- w.WriteHeader(http.StatusNotFound)
- return
- }
- if n.Cookie != cookie {
- log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
- w.WriteHeader(http.StatusNotFound)
- return
- }
- if n.NameSize > 0 {
- fname := string(n.Name)
- dotIndex := strings.LastIndex(fname, ".")
- if dotIndex > 0 {
- ext = fname[dotIndex:]
- }
- }
- mtype := ""
- if ext != "" {
- mtype = mime.TypeByExtension(ext)
- }
- if n.MimeSize > 0 {
- mtype = string(n.Mime)
- }
- if mtype != "" {
- w.Header().Set("Content-Type", mtype)
- }
- if n.NameSize > 0 {
- w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(string(n.Name)))
- }
- if ext != ".gz" {
- if n.IsGzipped() {
- if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- w.Header().Set("Content-Encoding", "gzip")
- } else {
- if n.Data, err = storage.UnGzipData(n.Data); err != nil {
- debug("lookup error:", err, r.URL.Path)
- }
- }
- }
- }
- w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
- w.Write(n.Data)
-}
-func PostHandler(w http.ResponseWriter, r *http.Request) {
- r.ParseForm()
- vid, _, _ := parseURLPath(r.URL.Path)
- volumeId, e := storage.NewVolumeId(vid)
- if e != nil {
- writeJson(w, r, e)
- } else {
- needle, filename, ne := storage.NewNeedle(r)
- if ne != nil {
- writeJson(w, r, ne)
- } else {
- ret, err := store.Write(volumeId, needle)
- errorStatus := ""
- needToReplicate := !store.HasVolume(volumeId)
- if err != nil {
- errorStatus = "Failed to write to local disk (" + err.Error() + ")"
- } else if ret > 0 {
- needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate()
- } else {
- errorStatus = "Failed to write to local disk"
- }
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "standard" {
- if !distributedOperation(volumeId, func(location operation.Location) bool {
- _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
- return err == nil
- }) {
- ret = 0
- errorStatus = "Failed to write to replicas for volume " + volumeId.String()
- }
- }
- }
- m := make(map[string]interface{})
- if errorStatus == "" {
- w.WriteHeader(http.StatusCreated)
- } else {
- store.Delete(volumeId, needle)
- distributedOperation(volumeId, func(location operation.Location) bool {
- return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
- })
- w.WriteHeader(http.StatusInternalServerError)
- m["error"] = errorStatus
- }
- m["size"] = ret
- writeJson(w, r, m)
- }
- }
-}
-func DeleteHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
- vid, fid, _ := parseURLPath(r.URL.Path)
- volumeId, _ := storage.NewVolumeId(vid)
- n.ParsePath(fid)
-
- debug("deleting", n)
-
- cookie := n.Cookie
- count, ok := store.Read(volumeId, n)
-
- if ok != nil {
- m := make(map[string]uint32)
- m["size"] = 0
- writeJson(w, r, m)
- return
- }
-
- if n.Cookie != cookie {
- log.Println("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
- return
- }
-
- n.Size = 0
- ret, err := store.Delete(volumeId, n)
- if err != nil {
- log.Printf("delete error: %s\n", err)
- return
- }
-
- needToReplicate := !store.HasVolume(volumeId)
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "standard" {
- if !distributedOperation(volumeId, func(location operation.Location) bool {
- return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
- }) {
- ret = 0
- }
- }
- }
-
- if ret != 0 {
- w.WriteHeader(http.StatusAccepted)
- } else {
- w.WriteHeader(http.StatusInternalServerError)
- }
-
- m := make(map[string]uint32)
- m["size"] = uint32(count)
- writeJson(w, r, m)
-}
-
-func parseURLPath(path string) (vid, fid, ext string) {
-
- sepIndex := strings.LastIndex(path, "/")
- commaIndex := strings.LastIndex(path[sepIndex:], ",")
- if commaIndex <= 0 {
- if "favicon.ico" != path[sepIndex+1:] {
- log.Println("unknown file id", path[sepIndex+1:])
- }
- return
- }
- dotIndex := strings.LastIndex(path[sepIndex:], ".")
- vid = path[sepIndex+1 : commaIndex]
- fid = path[commaIndex+1:]
- ext = ""
- if dotIndex > 0 {
- fid = path[commaIndex+1 : dotIndex]
- ext = path[dotIndex:]
- }
- return
-}
-
-func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
- if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
- length := 0
- selfUrl := (*ip + ":" + strconv.Itoa(*vport))
- results := make(chan bool)
- for _, location := range lookupResult.Locations {
- if location.Url != selfUrl {
- length++
- go func(location operation.Location, results chan bool) {
- results <- op(location)
- }(location, results)
- }
- }
- ret := true
- for i := 0; i < length; i++ {
- ret = ret && <-results
- }
- return ret
- } else {
- log.Println("Failed to lookup for", volumeId, lookupErr.Error())
- }
- return false
-}
-
-func runVolume(cmd *Command, args []string) bool {
- if *vMaxCpu < 1 {
- *vMaxCpu = runtime.NumCPU()
- }
- runtime.GOMAXPROCS(*vMaxCpu)
- fileInfo, err := os.Stat(*volumeFolder)
- if err != nil {
- log.Fatalf("No Existing Folder:%s", *volumeFolder)
- }
- if !fileInfo.IsDir() {
- log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
- }
- perm := fileInfo.Mode().Perm()
- log.Println("Volume Folder permission:", perm)
-
- if *publicUrl == "" {
- *publicUrl = *ip + ":" + strconv.Itoa(*vport)
- }
-
- store = storage.NewStore(*vport, *ip, *publicUrl, *volumeFolder, *maxVolumeCount)
- defer store.Close()
- http.HandleFunc("/", storeHandler)
- http.HandleFunc("/status", statusHandler)
- http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
- http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
- http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
- http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
-
- go func() {
- connected := true
- store.SetMaster(*masterNode)
- for {
- err := store.Join()
- if err == nil {
- if !connected {
- connected = true
- log.Println("Reconnected with master")
- }
- } else {
- if connected {
- connected = false
- }
- }
- time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond)
- }
- }()
- log.Println("store joined at", *masterNode)
-
- log.Println("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport))
- srv := &http.Server{
- Addr: ":" + strconv.Itoa(*vport),
- Handler: http.DefaultServeMux,
- ReadTimeout: (time.Duration(*vReadTimeout) * time.Second),
- }
- e := srv.ListenAndServe()
- if e != nil {
- log.Fatalf("Fail to start:%s", e.Error())
- }
- return true
-}
diff --git a/src/weed.go b/src/weed.go
deleted file mode 100644
index c03cb68ac..000000000
--- a/src/weed.go
+++ /dev/null
@@ -1,199 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "flag"
- "fmt"
- "io"
- "math/rand"
- "net/http"
- "os"
- "strings"
- "sync"
- "text/template"
- "time"
- "unicode"
- "unicode/utf8"
-)
-
-var IsDebug *bool
-var server *string
-
-var commands = []*Command{
- cmdFix,
- cmdMaster,
- cmdUpload,
- cmdShell,
- cmdVersion,
- cmdVolume,
- cmdExport,
-}
-
-var exitStatus = 0
-var exitMu sync.Mutex
-
-func setExitStatus(n int) {
- exitMu.Lock()
- if exitStatus < n {
- exitStatus = n
- }
- exitMu.Unlock()
-}
-
-func main() {
- rand.Seed(time.Now().UnixNano())
- flag.Usage = usage
- flag.Parse()
-
- args := flag.Args()
- if len(args) < 1 {
- usage()
- }
-
- if args[0] == "help" {
- help(args[1:])
- for _, cmd := range commands {
- if len(args) >= 2 && cmd.Name() == args[1] && cmd.Run != nil {
- fmt.Fprintf(os.Stderr, "Default Parameters:\n")
- cmd.Flag.PrintDefaults()
- }
- }
- return
- }
-
- for _, cmd := range commands {
- if cmd.Name() == args[0] && cmd.Run != nil {
- cmd.Flag.Usage = func() { cmd.Usage() }
- cmd.Flag.Parse(args[1:])
- args = cmd.Flag.Args()
- IsDebug = cmd.IsDebug
- if !cmd.Run(cmd, args) {
- fmt.Fprintf(os.Stderr, "\n")
- cmd.Flag.Usage()
- fmt.Fprintf(os.Stderr, "Default Parameters:\n")
- cmd.Flag.PrintDefaults()
- }
- exit()
- return
- }
- }
-
- fmt.Fprintf(os.Stderr, "weed: unknown subcommand %q\nRun 'weed help' for usage.\n", args[0])
- setExitStatus(2)
- exit()
-}
-
-var usageTemplate = `WeedFS is a software to store billions of files and serve them fast!
-
-Usage:
-
- weed command [arguments]
-
-The commands are:
-{{range .}}{{if .Runnable}}
- {{.Name | printf "%-11s"}} {{.Short}}{{end}}{{end}}
-
-Use "weed help [command]" for more information about a command.
-
-`
-
-var helpTemplate = `{{if .Runnable}}Usage: weed {{.UsageLine}}
-{{end}}
- {{.Long}}
-`
-
-// tmpl executes the given template text on data, writing the result to w.
-func tmpl(w io.Writer, text string, data interface{}) {
- t := template.New("top")
- t.Funcs(template.FuncMap{"trim": strings.TrimSpace, "capitalize": capitalize})
- template.Must(t.Parse(text))
- if err := t.Execute(w, data); err != nil {
- panic(err)
- }
-}
-
-func capitalize(s string) string {
- if s == "" {
- return s
- }
- r, n := utf8.DecodeRuneInString(s)
- return string(unicode.ToTitle(r)) + s[n:]
-}
-
-func printUsage(w io.Writer) {
- tmpl(w, usageTemplate, commands)
-}
-
-func usage() {
- printUsage(os.Stderr)
- os.Exit(2)
-}
-
-// help implements the 'help' command.
-func help(args []string) {
- if len(args) == 0 {
- printUsage(os.Stdout)
- // not exit 2: succeeded at 'weed help'.
- return
- }
- if len(args) != 1 {
- fmt.Fprintf(os.Stderr, "usage: weed help command\n\nToo many arguments given.\n")
- os.Exit(2) // failed at 'weed help'
- }
-
- arg := args[0]
-
- for _, cmd := range commands {
- if cmd.Name() == arg {
- tmpl(os.Stdout, helpTemplate, cmd)
- // not exit 2: succeeded at 'weed help cmd'.
- return
- }
- }
-
- fmt.Fprintf(os.Stderr, "Unknown help topic %#q. Run 'weed help'.\n", arg)
- os.Exit(2) // failed at 'weed help cmd'
-}
-
-var atexitFuncs []func()
-
-func atexit(f func()) {
- atexitFuncs = append(atexitFuncs, f)
-}
-
-func exit() {
- for _, f := range atexitFuncs {
- f()
- }
- os.Exit(exitStatus)
-}
-
-func exitIfErrors() {
- if exitStatus != 0 {
- exit()
- }
-}
-func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) {
- w.Header().Set("Content-Type", "application/javascript")
- var bytes []byte
- if r.FormValue("pretty") != "" {
- bytes, _ = json.MarshalIndent(obj, "", " ")
- } else {
- bytes, _ = json.Marshal(obj)
- }
- callback := r.FormValue("callback")
- if callback == "" {
- w.Write(bytes)
- } else {
- w.Write([]uint8(callback))
- w.Write([]uint8("("))
- fmt.Fprint(w, string(bytes))
- w.Write([]uint8(")"))
- }
-}
-
-func debug(params ...interface{}) {
- if *IsDebug {
- fmt.Println(params)
- }
-}