diff options
Diffstat (limited to 'go/weed')
| -rw-r--r-- | go/weed/export.go | 4 | ||||
| -rw-r--r-- | go/weed/fix.go | 4 | ||||
| -rw-r--r-- | go/weed/master.go | 52 | ||||
| -rw-r--r-- | go/weed/shell.go | 14 | ||||
| -rw-r--r-- | go/weed/upload.go | 4 | ||||
| -rw-r--r-- | go/weed/volume.go | 59 | ||||
| -rw-r--r-- | go/weed/weed.go | 33 |
7 files changed, 111 insertions, 59 deletions
diff --git a/go/weed/export.go b/go/weed/export.go index 6b391024a..9e65a4de3 100644 --- a/go/weed/export.go +++ b/go/weed/export.go @@ -3,12 +3,12 @@ package main import ( "archive/tar" "bytes" + "code.google.com/p/weed-fs/go/directory" + "code.google.com/p/weed-fs/go/storage" "fmt" "log" "os" "path" - "code.google.com/p/weed-fs/go/directory" - "code.google.com/p/weed-fs/go/storage" "strconv" "strings" "text/template" diff --git a/go/weed/fix.go b/go/weed/fix.go index 249007252..597bc0ef9 100644 --- a/go/weed/fix.go +++ b/go/weed/fix.go @@ -1,10 +1,10 @@ package main import ( + "code.google.com/p/weed-fs/go/storage" "log" "os" "path" - "code.google.com/p/weed-fs/go/storage" "strconv" ) @@ -52,7 +52,7 @@ func runFix(cmd *Command, args []string) bool { debug("saved", count, "with error", pe) } else { debug("skipping deleted file ...") - nm.Delete(n.Id) + return nm.Delete(n.Id) } return nil }) diff --git a/go/weed/master.go b/go/weed/master.go index 3d8757c16..f6cc88df0 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -1,13 +1,13 @@ package main import ( + "code.google.com/p/weed-fs/go/replication" + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/topology" "encoding/json" "errors" "log" "net/http" - "code.google.com/p/weed-fs/go/replication" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" "runtime" "strconv" "strings" @@ -57,14 +57,14 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) { for _, dn := range machines { ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) } - writeJson(w, r, map[string]interface{}{"locations": ret}) + writeJsonQuiet(w, r, map[string]interface{}{"locations": ret}) } else { w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) } } else { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid}) } } @@ -80,24 +80,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) { rt, err := storage.NewReplicationTypeFromString(repType) if err != nil { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) return } if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { if topo.FreeSpace() <= 0 { w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "No free volumes left!"}) + writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) return } else { - vg.GrowByType(rt, topo) + if _, err = vg.GrowByType(rt, topo); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) + return + } } } fid, count, dn, err := topo.PickForWrite(rt, c) if err == nil { - writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) + writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) } else { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } } @@ -112,19 +115,22 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) { s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") publicUrl := r.FormValue("publicUrl") volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) + return + } debug(s, "volumes", r.FormValue("volumes")) topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) m := make(map[string]interface{}) m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = VERSION m["Topology"] = topo.ToMap() - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { @@ -153,10 +159,10 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { } if err != nil { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()}) } else { w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]interface{}{"count": count}) + writeJsonQuiet(w, r, map[string]interface{}{"count": count}) } } @@ -164,7 +170,7 @@ func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = VERSION m["Volumes"] = topo.ToVolumeMap() - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func redirectHandler(w http.ResponseWriter, r *http.Request) { @@ -179,7 +185,7 @@ func redirectHandler(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) } else { w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) } } @@ -188,7 +194,11 @@ func runMaster(cmd *Command, args []string) bool { *mMaxCpu = runtime.NumCPU() } runtime.GOMAXPROCS(*mMaxCpu) - topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) + var e error + if topo, e = topology.NewTopology("topo", *confFile, *metaFolder, "weed", + uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil { + log.Fatalf("cannot create topology:%s", e) + } vg = replication.NewDefaultVolumeGrowth() log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") http.HandleFunc("/dir/assign", dirAssignHandler) @@ -209,9 +219,9 @@ func runMaster(cmd *Command, args []string) bool { Handler: http.DefaultServeMux, ReadTimeout: time.Duration(*mReadTimeout) * time.Second, } - e := srv.ListenAndServe() + e = srv.ListenAndServe() if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) + log.Fatalf("Fail to start:%s", e) } return true } diff --git a/go/weed/shell.go b/go/weed/shell.go index daf0b7e1f..4287f2148 100644 --- a/go/weed/shell.go +++ b/go/weed/shell.go @@ -3,6 +3,7 @@ package main import ( "bufio" "fmt" + "log" "os" ) @@ -25,8 +26,13 @@ func runShell(command *Command, args []string) bool { o := bufio.NewWriter(os.Stdout) e := bufio.NewWriter(os.Stderr) prompt := func() { - o.WriteString("> ") - o.Flush() + var err error + if _, err = o.WriteString("> "); err != nil { + log.Printf("error writing to stdout: %s", err) + } + if err = o.Flush(); err != nil { + log.Printf("error flushing stdout: %s", err) + } } readLine := func() string { ret, err := r.ReadString('\n') @@ -38,7 +44,9 @@ func runShell(command *Command, args []string) bool { } execCmd := func(cmd string) int { if cmd != "" { - o.WriteString(cmd) + if _, err := o.WriteString(cmd); err != nil { + log.Printf("error writing to stdout: %s", err) + } } return 0 } diff --git a/go/weed/upload.go b/go/weed/upload.go index 92478b7b6..a47551ddf 100644 --- a/go/weed/upload.go +++ b/go/weed/upload.go @@ -1,14 +1,14 @@ package main import ( + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" "fmt" "net/url" "os" "path" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/util" "strconv" ) diff --git a/go/weed/volume.go b/go/weed/volume.go index edf2ad821..fd2298541 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -2,13 +2,13 @@ package main import ( "bytes" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/storage" "log" "math/rand" "mime" "net/http" "os" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/storage" "runtime" "strconv" "strings" @@ -48,41 +48,41 @@ func statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = VERSION m["Volumes"] = store.Status() - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) if err == nil { - writeJson(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } debug("assign volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) } func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) if err == nil { - writeJson(w, r, map[string]interface{}{"error": "", "result": ret}) + writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) } else { - writeJson(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) } debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) } func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { err := store.CompactVolume(r.FormValue("volume")) if err == nil { - writeJson(w, r, map[string]string{"error": ""}) + writeJsonQuiet(w, r, map[string]string{"error": ""}) } else { - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } debug("compacted volume =", r.FormValue("volume"), ", error =", err) } func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { err := store.CommitCompactVolume(r.FormValue("volume")) if err == nil { - writeJson(w, r, map[string]interface{}{"error": ""}) + writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) } else { - writeJson(w, r, map[string]string{"error": err.Error()}) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } debug("commit compact volume =", r.FormValue("volume"), ", error =", err) } @@ -163,18 +163,29 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { } } w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) - w.Write(n.Data) + if _, e = w.Write(n.Data); e != nil { + debug("response write error:", e) + } } func PostHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() + if e := r.ParseForm(); e != nil { + debug("form parse error:", e) + writeJsonQuiet(w, r, e) + return + } vid, _, _ := parseURLPath(r.URL.Path) volumeId, e := storage.NewVolumeId(vid) if e != nil { - writeJson(w, r, e) + debug("NewVolumeId error:", e) + writeJsonQuiet(w, r, e) + return + } + if e != nil { + writeJsonQuiet(w, r, e) } else { needle, filename, ne := storage.NewNeedle(r) if ne != nil { - writeJson(w, r, ne) + writeJsonQuiet(w, r, ne) } else { ret, err := store.Write(volumeId, needle) errorStatus := "" @@ -204,15 +215,19 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { if errorStatus == "" { w.WriteHeader(http.StatusCreated) } else { - store.Delete(volumeId, needle) - distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) + if _, e = store.Delete(volumeId, needle); e != nil { + errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + + strconv.FormatUint(uint64(volumeId), 10) + ": " + e.Error() + } else { + distributedOperation(volumeId, func(location operation.Location) bool { + return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") + }) + } w.WriteHeader(http.StatusInternalServerError) m["error"] = errorStatus } m["size"] = ret - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } } } @@ -230,7 +245,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { if ok != nil { m := make(map[string]uint32) m["size"] = 0 - writeJson(w, r, m) + writeJsonQuiet(w, r, m) return } @@ -268,7 +283,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]uint32) m["size"] = uint32(count) - writeJson(w, r, m) + writeJsonQuiet(w, r, m) } func parseURLPath(path string) (vid, fid, ext string) { diff --git a/go/weed/weed.go b/go/weed/weed.go index c03cb68ac..e97c8b550 100644 --- a/go/weed/weed.go +++ b/go/weed/weed.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "log" "math/rand" "net/http" "os" @@ -173,22 +174,40 @@ func exitIfErrors() { exit() } } -func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) { + +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) { w.Header().Set("Content-Type", "application/javascript") var bytes []byte if r.FormValue("pretty") != "" { - bytes, _ = json.MarshalIndent(obj, "", " ") + bytes, err = json.MarshalIndent(obj, "", " ") } else { - bytes, _ = json.Marshal(obj) + bytes, err = json.Marshal(obj) + } + if err != nil { + return } callback := r.FormValue("callback") if callback == "" { - w.Write(bytes) + _, err = w.Write(bytes) } else { - w.Write([]uint8(callback)) - w.Write([]uint8("(")) + if _, err = w.Write([]uint8(callback)); err != nil { + return + } + if _, err = w.Write([]uint8("(")); err != nil { + return + } fmt.Fprint(w, string(bytes)) - w.Write([]uint8(")")) + if _, err = w.Write([]uint8(")")); err != nil { + return + } + } + return +} + +// wrapper for writeJson - just logs errors +func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) { + if err := writeJson(w, r, obj); err != nil { + log.Printf("error writing JSON %s: %s", obj, err) } } |
