diff options
| author | Chris Lu <chris.lu@gmail.com> | 2013-12-02 01:37:36 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2013-12-02 01:37:36 -0800 |
| commit | c38eee73ca752b670adc0717b3436c5e9a49c4f8 (patch) | |
| tree | 027eec1235ed1523c90d02f8e44e325b4f243ac2 /go/weed/volume.go | |
| parent | bc2f3b26e708636700af674e20ec0a8cc6dede6c (diff) | |
| download | seaweedfs-c38eee73ca752b670adc0717b3436c5e9a49c4f8.tar.xz seaweedfs-c38eee73ca752b670adc0717b3436c5e9a49c4f8.zip | |
refactoring to separate master and volume server, so that these servers
can be embedded into other applications
Diffstat (limited to 'go/weed/volume.go')
| -rw-r--r-- | go/weed/volume.go | 296 |
1 files changed, 4 insertions, 292 deletions
diff --git a/go/weed/volume.go b/go/weed/volume.go index f3a6038c2..50864c541 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -2,15 +2,10 @@ package main import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/replication" - "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/weed/weed_server" "github.com/gorilla/mux" - "math/rand" - "mime" "net/http" "os" - "path/filepath" "runtime" "strconv" "strings" @@ -44,264 +39,9 @@ var ( rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - store *storage.Store volumeWhiteList []string ) -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") - -func statusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = store.Status() - writeJsonQuiet(w, r, m) -} -func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType")) - if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) -} -func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { - err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) - } else { - writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) - } - debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) -} -func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { - err := store.CompactVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("compacted volume =", r.FormValue("volume"), ", error =", err) -} -func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { - err := store.CommitCompactVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("commit compact volume =", r.FormValue("volume"), ", error =", err) -} -func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { - //TODO: notify master that this volume will be read-only - err := store.FreezeVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("freeze volume =", r.FormValue("volume"), ", error =", err) -} -func submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) { - submitForClientHandler(w, r, *masterNode) -} -func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - GetOrHeadHandler(w, r, true) - case "HEAD": - GetOrHeadHandler(w, r, false) - case "DELETE": - secure(volumeWhiteList, DeleteHandler)(w, r) - case "PUT": - secure(volumeWhiteList, PostHandler)(w, r) - case "POST": - secure(volumeWhiteList, PostHandler)(w, r) - } -} -func GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { - n := new(storage.Needle) - vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - n.ParsePath(fid) - - debug("volume", volumeId, "reading", n) - if !store.HasVolume(volumeId) { - lookupResult, err := operation.Lookup(*masterNode, volumeId) - debug("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil { - http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - debug("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - } - return - } - cookie := n.Cookie - count, e := store.Read(volumeId, n) - debug("read bytes", count, "error", e) - if e != nil || count <= 0 { - debug("read error:", e, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - if n.Cookie != cookie { - glog.V(0).Infoln("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(http.StatusNotFound) - return - } - if n.LastModified != 0 { - w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) - if r.Header.Get("If-Modified-Since") != "" { - if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { - if t.Unix() >= int64(n.LastModified) { - w.WriteHeader(http.StatusNotModified) - return - } - } - } - } - if n.NameSize > 0 && filename == "" { - filename = string(n.Name) - dotIndex := strings.LastIndex(filename, ".") - if dotIndex > 0 { - ext = filename[dotIndex:] - } - } - mtype := "" - if ext != "" { - mtype = mime.TypeByExtension(ext) - } - if n.MimeSize > 0 { - mtype = string(n.Mime) - } - if mtype != "" { - w.Header().Set("Content-Type", mtype) - } - if filename != "" { - w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename)) - } - if ext != ".gz" { - if n.IsGzipped() { - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - w.Header().Set("Content-Encoding", "gzip") - } else { - if n.Data, err = storage.UnGzipData(n.Data); err != nil { - debug("lookup error:", err, r.URL.Path) - } - } - } - } - w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) - if isGetMethod { - if _, e = w.Write(n.Data); e != nil { - debug("response write error:", e) - } - } -} -func PostHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - if e := r.ParseForm(); e != nil { - debug("form parse error:", e) - writeJsonError(w, r, e) - return - } - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, ve := storage.NewVolumeId(vid) - if ve != nil { - debug("NewVolumeId error:", ve) - writeJsonError(w, r, ve) - return - } - needle, ne := storage.NewNeedle(r) - if ne != nil { - writeJsonError(w, r, ne) - return - } - ret, errorStatus := replication.ReplicatedWrite(*masterNode, store, volumeId, needle, r) - if errorStatus == "" { - w.WriteHeader(http.StatusCreated) - } else { - w.WriteHeader(http.StatusInternalServerError) - m["error"] = errorStatus - } - m["size"] = ret - writeJsonQuiet(w, r, m) -} -func DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(fid) - - debug("deleting", n) - - cookie := n.Cookie - count, ok := store.Read(volumeId, n) - - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJsonQuiet(w, r, m) - return - } - - if n.Cookie != cookie { - glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 - ret := replication.ReplicatedDelete(*masterNode, store, volumeId, n, r) - - if ret != 0 { - w.WriteHeader(http.StatusAccepted) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJsonQuiet(w, r, m) -} - -func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { - switch strings.Count(path, "/") { - case 3: - parts := strings.Split(path, "/") - vid, fid, filename = parts[1], parts[2], parts[3] - ext = filepath.Ext(filename) - case 2: - parts := strings.Split(path, "/") - vid, fid = parts[1], parts[2] - dotIndex := strings.LastIndex(fid, ".") - if dotIndex > 0 { - ext = fid[dotIndex:] - fid = fid[0:dotIndex] - } - default: - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - vid, isVolumeIdOnly = path[sepIndex+1:], true - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - } - return -} - func runVolume(cmd *Command, args []string) bool { if *vMaxCpu < 1 { *vMaxCpu = runtime.NumCPU() @@ -340,39 +80,11 @@ func runVolume(cmd *Command, args []string) bool { volumeWhiteList = strings.Split(*volumeWhiteListOption, ",") } - store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts) - defer store.Close() r := mux.NewRouter() - r.HandleFunc("/submit", secure(volumeWhiteList, submitFromVolumeServerHandler)) - r.HandleFunc("/status", secure(volumeWhiteList, statusHandler)) - r.HandleFunc("/admin/assign_volume", secure(volumeWhiteList, assignVolumeHandler)) - r.HandleFunc("/admin/vacuum_volume_check", secure(volumeWhiteList, vacuumVolumeCheckHandler)) - r.HandleFunc("/admin/vacuum_volume_compact", secure(volumeWhiteList, vacuumVolumeCompactHandler)) - r.HandleFunc("/admin/vacuum_volume_commit", secure(volumeWhiteList, vacuumVolumeCommitHandler)) - r.HandleFunc("/admin/freeze_volume", secure(volumeWhiteList, freezeVolumeHandler)) - r.HandleFunc("/", storeHandler) - go func() { - connected := true - store.SetMaster(*masterNode) - store.SetDataCenter(*dataCenter) - store.SetRack(*rack) - for { - err := store.Join() - if err == nil { - if !connected { - connected = true - glog.V(0).Infoln("Reconnected with master") - } - } else { - if connected { - connected = false - } - } - time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - glog.V(0).Infoln("store joined at", *masterNode) + weed_server.NewVolumeServer(r, VERSION, *ip, *vport, *publicUrl, folders, maxCounts, + *masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList, + ) glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) srv := &http.Server{ |
