diff options
| author | Chris Lu <chris.lu@gmail.com> | 2016-06-02 18:09:14 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2016-06-02 18:09:14 -0700 |
| commit | 5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44 (patch) | |
| tree | 2e4dd2ad0a618ab2b7cdebcdb9c503526c31e2e8 /go/weed/weed_server | |
| parent | caeffa3998adc060fa66c4cd77af971ff2d26c57 (diff) | |
| download | seaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.tar.xz seaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.zip | |
directory structure change to work with glide
glide has its own requirements. My previous workaround caused me some
code checkin errors. Need to fix this.
Diffstat (limited to 'go/weed/weed_server')
21 files changed, 0 insertions, 2489 deletions
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go deleted file mode 100644 index a7fa2de53..000000000 --- a/go/weed/weed_server/common.go +++ /dev/null @@ -1,179 +0,0 @@ -package weed_server - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "net/http" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/security" - "github.com/chrislusf/seaweedfs/go/stats" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -var serverStats *stats.ServerStats -var startTime = time.Now() - -func init() { - serverStats = stats.NewServerStats() - go serverStats.Start() - -} - -func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { - var bytes []byte - if r.FormValue("pretty") != "" { - bytes, err = json.MarshalIndent(obj, "", " ") - } else { - bytes, err = json.Marshal(obj) - } - if err != nil { - return - } - callback := r.FormValue("callback") - if callback == "" { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(httpStatus) - _, err = w.Write(bytes) - } else { - w.Header().Set("Content-Type", "application/javascript") - w.WriteHeader(httpStatus) - if _, err = w.Write([]uint8(callback)); err != nil { - return - } - if _, err = w.Write([]uint8("(")); err != nil { - return - } - fmt.Fprint(w, string(bytes)) - if _, err = w.Write([]uint8(")")); err != nil { - return - } - } - - return -} - -// wrapper for writeJson - just logs errors -func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { - if err := writeJson(w, r, httpStatus, obj); err != nil { - glog.V(0).Infof("error writing JSON %s: %v", obj, err) - } -} -func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { - m := make(map[string]interface{}) - m["error"] = err.Error() - writeJsonQuiet(w, r, httpStatus, m) -} - -func debug(params ...interface{}) { - glog.V(4).Infoln(params) -} - -func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { - jwt := security.GetJwt(r) - m := make(map[string]interface{}) - if r.Method != "POST" { - writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) - return - } - - debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r) - if pe != nil { - writeJsonError(w, r, http.StatusBadRequest, pe) - return - } - - debug("assigning file id for", fname) - r.ParseForm() - assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl")) - if ae != nil { - writeJsonError(w, r, http.StatusInternalServerError, ae) - return - } - - url := "http://" + assignResult.Url + "/" + assignResult.Fid - if lastModified != 0 { - url = url + "?ts=" + strconv.FormatUint(lastModified, 10) - } - - debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - - m["fileName"] = fname - m["fid"] = assignResult.Fid - m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid - m["size"] = uploadResult.Size - writeJsonQuiet(w, r, http.StatusCreated, m) - return -} - -func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { - r.ParseForm() - fids := r.Form["fid"] - ret, err := operation.DeleteFiles(masterUrl, fids) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - writeJsonQuiet(w, r, http.StatusAccepted, ret) -} - -func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { - switch strings.Count(path, "/") { - case 3: - parts := strings.Split(path, "/") - vid, fid, filename = parts[1], parts[2], parts[3] - ext = filepath.Ext(filename) - case 2: - parts := strings.Split(path, "/") - vid, fid = parts[1], parts[2] - dotIndex := strings.LastIndex(fid, ".") - if dotIndex > 0 { - ext = fid[dotIndex:] - fid = fid[0:dotIndex] - } - default: - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - vid, isVolumeIdOnly = path[sepIndex+1:], true - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - } - return -} - -func statsCounterHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Counters"] = serverStats - writeJsonQuiet(w, r, http.StatusOK, m) -} - -func statsMemoryHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Memory"] = stats.MemStat() - writeJsonQuiet(w, r, http.StatusOK, m) -} diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go deleted file mode 100644 index e3c45d9e5..000000000 --- a/go/weed/weed_server/filer_server.go +++ /dev/null @@ -1,67 +0,0 @@ -package weed_server - -import ( - "net/http" - "strconv" - - "github.com/chrislusf/seaweedfs/go/filer" - "github.com/chrislusf/seaweedfs/go/filer/cassandra_store" - "github.com/chrislusf/seaweedfs/go/filer/embedded_filer" - "github.com/chrislusf/seaweedfs/go/filer/flat_namespace" - "github.com/chrislusf/seaweedfs/go/filer/redis_store" - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/security" -) - -type FilerServer struct { - port string - master string - collection string - defaultReplication string - redirectOnRead bool - disableDirListing bool - secret security.Secret - filer filer.Filer -} - -func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, - replication string, redirectOnRead bool, disableDirListing bool, - secret string, - cassandra_server string, cassandra_keyspace string, - redis_server string, redis_password string, redis_database int, -) (fs *FilerServer, err error) { - fs = &FilerServer{ - master: master, - collection: collection, - defaultReplication: replication, - redirectOnRead: redirectOnRead, - disableDirListing: disableDirListing, - port: ip + ":" + strconv.Itoa(port), - } - - if cassandra_server != "" { - cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) - if err != nil { - glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) - } - fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store) - } else if redis_server != "" { - redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database) - fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store) - } else { - if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { - glog.Fatalf("Can not start filer in dir %s : %v", dir, err) - return - } - - r.HandleFunc("/admin/mv", fs.moveHandler) - } - - r.HandleFunc("/", fs.filerHandler) - - return fs, nil -} - -func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(fs.secret, fileId) -} diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go deleted file mode 100644 index efc4c0381..000000000 --- a/go/weed/weed_server/filer_server_handlers.go +++ /dev/null @@ -1,265 +0,0 @@ -package weed_server - -import ( - "bytes" - "encoding/json" - "errors" - "io" - "io/ioutil" - "net/http" - "net/url" - "strconv" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" - "github.com/syndtr/goleveldb/leveldb" -) - -func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - fs.GetOrHeadHandler(w, r, true) - case "HEAD": - fs.GetOrHeadHandler(w, r, false) - case "DELETE": - fs.DeleteHandler(w, r) - case "PUT": - fs.PostHandler(w, r) - case "POST": - fs.PostHandler(w, r) - } -} - -// listDirectoryHandler lists directories and folers under a directory -// files are sorted by name and paginated via "lastFileName" and "limit". -// sub directories are listed on the first page, when "lastFileName" -// is empty. -func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { - if !strings.HasSuffix(r.URL.Path, "/") { - return - } - dirlist, err := fs.filer.ListDirectories(r.URL.Path) - if err == leveldb.ErrNotFound { - glog.V(3).Infoln("Directory Not Found in db", r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - m := make(map[string]interface{}) - m["Directory"] = r.URL.Path - lastFileName := r.FormValue("lastFileName") - if lastFileName == "" { - m["Subdirectories"] = dirlist - } - limit, limit_err := strconv.Atoi(r.FormValue("limit")) - if limit_err != nil { - limit = 100 - } - m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit) - writeJsonQuiet(w, r, http.StatusOK, m) -} - -func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { - if strings.HasSuffix(r.URL.Path, "/") { - if fs.disableDirListing { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - fs.listDirectoryHandler(w, r) - return - } - - fileId, err := fs.filer.FindFile(r.URL.Path) - if err == leveldb.ErrNotFound { - glog.V(3).Infoln("Not found in db", r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - - urlLocation, err := operation.LookupFileId(fs.master, fileId) - if err != nil { - glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) - w.WriteHeader(http.StatusNotFound) - return - } - urlString := urlLocation - if fs.redirectOnRead { - http.Redirect(w, r, urlString, http.StatusFound) - return - } - u, _ := url.Parse(urlString) - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - glog.V(3).Infoln("retrieving from", u) - resp, do_err := util.Do(request) - if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, do_err) - return - } - defer resp.Body.Close() - for k, v := range resp.Header { - w.Header()[k] = v - } - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) -} - -type analogueReader struct { - *bytes.Buffer -} - -// So that it implements the io.ReadCloser interface -func (m analogueReader) Close() error { return nil } - -func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query() - replication := query.Get("replication") - if replication == "" { - replication = fs.defaultReplication - } - collection := query.Get("collection") - if collection == "" { - collection = fs.collection - } - - var fileId string - var err error - var urlLocation string - if r.Method == "PUT" { - buf, _ := ioutil.ReadAll(r.Body) - r.Body = analogueReader{bytes.NewBuffer(buf)} - fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) - if pe != nil { - glog.V(0).Infoln("failing to parse post body", pe.Error()) - writeJsonError(w, r, http.StatusInternalServerError, pe) - return - } - //reconstruct http request body for following new request to volume server - r.Body = analogueReader{bytes.NewBuffer(buf)} - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if fileName != "" { - path += fileName - } - } - - if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound { - glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } else if fileId != "" && err == nil { - var le error - urlLocation, le = operation.LookupFileId(fs.master, fileId) - if le != nil { - glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) - w.WriteHeader(http.StatusNotFound) - return - } - } - } else { - assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) - if ae != nil { - glog.V(0).Infoln("failing to assign a file id", ae.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ae) - return - } - fileId = assignResult.Fid - urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid - } - - u, _ := url.Parse(urlLocation) - glog.V(4).Infoln("post to", u) - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - resp, do_err := util.Do(request) - if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, do_err) - return - } - defer resp.Body.Close() - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ra_err) - return - } - glog.V(4).Infoln("post result", string(resp_body)) - var ret operation.UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err) - return - } - if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) - return - } - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, - errors.New("Can not to write to folder "+path+" without a file name")) - return - } - } - glog.V(4).Infoln("saving", path, "=>", fileId) - if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { - operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) - writeJsonError(w, r, http.StatusInternalServerError, db_err) - return - } - w.WriteHeader(http.StatusCreated) - w.Write(resp_body) -} - -// curl -X DELETE http://localhost:8888/path/to -// curl -X DELETE http://localhost:8888/path/to?recursive=true -func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - var err error - var fid string - if strings.HasSuffix(r.URL.Path, "/") { - isRecursive := r.FormValue("recursive") == "true" - err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive) - } else { - fid, err = fs.filer.DeleteFile(r.URL.Path) - if err == nil && fid != "" { - err = operation.DeleteFile(fs.master, fid, fs.jwt(fid)) - } - } - if err == nil { - writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) - } else { - glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, err) - } -} diff --git a/go/weed/weed_server/filer_server_handlers_admin.go b/go/weed/weed_server/filer_server_handlers_admin.go deleted file mode 100644 index 2f317ff79..000000000 --- a/go/weed/weed_server/filer_server_handlers_admin.go +++ /dev/null @@ -1,29 +0,0 @@ -package weed_server - -import ( - "net/http" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -/* -Move a folder or a file, with 4 Use cases: - mv fromDir toNewDir - mv fromDir toOldDir - mv fromFile toDir - mv fromFile toFile - -Wildcard is not supported. - -*/ -func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) { - from := r.FormValue("from") - to := r.FormValue("to") - err := fs.filer.Move(from, to) - if err != nil { - glog.V(4).Infoln("moving", from, "->", to, err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, err) - } else { - w.WriteHeader(http.StatusOK) - } -} diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go deleted file mode 100644 index db70ca6b1..000000000 --- a/go/weed/weed_server/master_server.go +++ /dev/null @@ -1,131 +0,0 @@ -package weed_server - -import ( - "fmt" - "net/http" - "net/http/httputil" - "net/url" - "sync" - - "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/security" - "github.com/chrislusf/seaweedfs/go/sequence" - "github.com/chrislusf/seaweedfs/go/topology" - "github.com/chrislusf/seaweedfs/go/util" - "github.com/gorilla/mux" -) - -type MasterServer struct { - port int - metaFolder string - volumeSizeLimitMB uint - pulseSeconds int - defaultReplicaPlacement string - garbageThreshold string - guard *security.Guard - - Topo *topology.Topology - vg *topology.VolumeGrowth - vgLock sync.Mutex - - bounedLeaderChan chan int -} - -func NewMasterServer(r *mux.Router, port int, metaFolder string, - volumeSizeLimitMB uint, - pulseSeconds int, - confFile string, - defaultReplicaPlacement string, - garbageThreshold string, - whiteList []string, - secureKey string, -) *MasterServer { - ms := &MasterServer{ - port: port, - volumeSizeLimitMB: volumeSizeLimitMB, - pulseSeconds: pulseSeconds, - defaultReplicaPlacement: defaultReplicaPlacement, - garbageThreshold: garbageThreshold, - } - ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewMemorySequencer() - var e error - if ms.Topo, e = topology.NewTopology("topo", confFile, seq, - uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { - glog.Fatalf("cannot create topology:%s", e) - } - ms.vg = topology.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - - ms.guard = security.NewGuard(whiteList, secureKey) - - r.HandleFunc("/", ms.uiStatusHandler) - r.HandleFunc("/ui/index.html", ms.uiStatusHandler) - r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - - ms.Topo.StartRefreshWritableVolumes(garbageThreshold) - - return ms -} - -func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) - if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") - } else { - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") - } - } -} - -func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - if ms.Topo.IsLeader() { - f(w, r) - } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { - ms.bounedLeaderChan <- 1 - defer func() { <-ms.bounedLeaderChan }() - targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) - return - } - glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) - proxy := httputil.NewSingleHostReverseProxy(targetUrl) - director := proxy.Director - proxy.Director = func(req *http.Request) { - actualHost, err := security.GetActualRemoteHost(req) - if err == nil { - req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) - } - director(req) - } - proxy.Transport = util.Transport - proxy.ServeHTTP(w, r) - } else { - //drop it to the floor - //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) - } - } -} diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go deleted file mode 100644 index 2be5d9524..000000000 --- a/go/weed/weed_server/master_server_handlers.go +++ /dev/null @@ -1,104 +0,0 @@ -package weed_server - -import ( - "fmt" - "net/http" - "strconv" - "strings" - - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/stats" - "github.com/chrislusf/seaweedfs/go/storage" -) - -func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { - volumeLocations = make(map[string]operation.LookupResult) - for _, vid := range vids { - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - if _, ok := volumeLocations[vid]; ok { - continue - } - volumeId, err := storage.NewVolumeId(vid) - if err == nil { - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil { - var ret []operation.Location - for _, dn := range machines { - ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) - } - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret} - } else { - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: "volumeId not found."} - } - } else { - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: "Unknown volumeId format."} - } - } - return -} - -// Takes one volumeId only, can not do batch lookup -func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - vids := []string{vid} - collection := r.FormValue("collection") //optional, but can be faster if too many collections - volumeLocations := ms.lookupVolumeId(vids, collection) - location := volumeLocations[vid] - httpStatus := http.StatusOK - if location.Error != "" { - httpStatus = http.StatusNotFound - } - writeJsonQuiet(w, r, httpStatus, location) -} - -// This can take batched volumeIds, &volumeId=x&volumeId=y&volumeId=z -func (ms *MasterServer) volumeLookupHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - vids := r.Form["volumeId"] - collection := r.FormValue("collection") //optional, but can be faster if too many collections - volumeLocations := ms.lookupVolumeId(vids, collection) - writeJsonQuiet(w, r, http.StatusOK, volumeLocations) -} - -func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { - stats.AssignRequest() - requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64) - if e != nil || requestedCount == 0 { - requestedCount = 1 - } - - option, err := ms.getVolumeGrowOption(r) - if err != nil { - writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) - return - } - - if !ms.Topo.HasWritableVolume(option) { - if ms.Topo.FreeSpace() <= 0 { - writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"}) - return - } - ms.vgLock.Lock() - defer ms.vgLock.Unlock() - if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Cannot grow volume group! %v", err)) - return - } - } - } - fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) - } else { - writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) - } -} diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go deleted file mode 100644 index 07399596a..000000000 --- a/go/weed/weed_server/master_server_handlers_admin.go +++ /dev/null @@ -1,193 +0,0 @@ -package weed_server - -import ( - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "math/rand" - "net/http" - "strconv" - "strings" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/topology" - "github.com/chrislusf/seaweedfs/go/util" - "github.com/golang/protobuf/proto" -) - -func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { - collection, ok := ms.Topo.FindCollection(r.FormValue("collection")) - if !ok { - writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection"))) - return - } - for _, server := range collection.ListVolumeServers() { - _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - } - ms.Topo.DeleteCollection(r.FormValue("collection")) -} - -func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - writeJsonError(w, r, http.StatusBadRequest, err) - return - } - joinMessage := &operation.JoinMessage{} - if err = proto.Unmarshal(body, joinMessage); err != nil { - writeJsonError(w, r, http.StatusBadRequest, err) - return - } - if *joinMessage.Ip == "" { - *joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")] - } - if glog.V(4) { - if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil { - glog.V(0).Infoln("json marshaling error: ", jsonError) - writeJsonError(w, r, http.StatusBadRequest, jsonError) - return - } else { - glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData)) - } - } - - ms.Topo.ProcessJoinMessage(joinMessage) - writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ - VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), - }) -} - -func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Topology"] = ms.Topo.ToMap() - writeJsonQuiet(w, r, http.StatusOK, m) -} - -func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - gcThreshold := r.FormValue("garbageThreshold") - if gcThreshold == "" { - gcThreshold = ms.garbageThreshold - } - glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(gcThreshold) - ms.dirStatusHandler(w, r) -} - -func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - option, err := ms.getVolumeGrowOption(r) - if err != nil { - writeJsonError(w, r, http.StatusNotAcceptable, err) - return - } - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) - } else { - count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) - } - } else { - err = errors.New("parameter count is not found") - } - } - if err != nil { - writeJsonError(w, r, http.StatusNotAcceptable, err) - } else { - writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count}) - } -} - -func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Volumes"] = ms.Topo.ToVolumeMap() - writeJsonQuiet(w, r, http.StatusOK, m) -} - -func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - collection := r.FormValue("collection") - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil && len(machines) > 0 { - var url string - if r.URL.RawQuery != "" { - url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery - } else { - url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path - } - http.Redirect(w, r, url, http.StatusMovedPermanently) - } else { - writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection)) - } -} - -func (ms *MasterServer) selfUrl(r *http.Request) string { - if r.Host != "" { - return r.Host - } - return "localhost:" + strconv.Itoa(ms.port) -} -func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { - if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r)) - } else { - masterUrl, err := ms.Topo.Leader() - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else { - submitForClientHandler(w, r, masterUrl) - } - } -} - -func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { - if ms.Topo.IsLeader() { - deleteForClientHandler(w, r, ms.selfUrl(r)) - } else { - deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader()) - } -} - -func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) - return vl.GetActiveVolumeCount(option) > 0 -} - -func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { - replicationString := r.FormValue("replication") - if replicationString == "" { - replicationString = ms.defaultReplicaPlacement - } - replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) - if err != nil { - return nil, err - } - ttl, err := storage.ReadTTL(r.FormValue("ttl")) - if err != nil { - return nil, err - } - volumeGrowOption := &topology.VolumeGrowOption{ - Collection: r.FormValue("collection"), - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - DataCenter: r.FormValue("dataCenter"), - Rack: r.FormValue("rack"), - DataNode: r.FormValue("dataNode"), - } - return volumeGrowOption, nil -} diff --git a/go/weed/weed_server/master_server_handlers_ui.go b/go/weed/weed_server/master_server_handlers_ui.go deleted file mode 100644 index af7261ab3..000000000 --- a/go/weed/weed_server/master_server_handlers_ui.go +++ /dev/null @@ -1,30 +0,0 @@ -package weed_server - -import ( - "net/http" - - "github.com/chrislusf/seaweedfs/go/stats" - "github.com/chrislusf/seaweedfs/go/util" - ui "github.com/chrislusf/seaweedfs/go/weed/weed_server/master_ui" -) - -func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { - infos := make(map[string]interface{}) - infos["Version"] = util.VERSION - args := struct { - Version string - Topology interface{} - Leader string - Peers interface{} - Stats map[string]interface{} - Counters *stats.ServerStats - }{ - util.VERSION, - ms.Topo.ToMap(), - ms.Topo.RaftServer.Leader(), - ms.Topo.RaftServer.Peers(), - infos, - serverStats, - } - ui.StatusTpl.Execute(w, args) -} diff --git a/go/weed/weed_server/master_ui/templates.go b/go/weed/weed_server/master_ui/templates.go deleted file mode 100644 index e9ee2d8d2..000000000 --- a/go/weed/weed_server/master_ui/templates.go +++ /dev/null @@ -1,102 +0,0 @@ -package master_ui - -import ( - "html/template" -) - -var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> -<html> - <head> - <title>SeaweedFS {{ .Version }}</title> - <link rel="icon" href="http://7viirv.com1.z0.glb.clouddn.com/seaweed_favicon.png" sizes="32x32" /> - <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css"> - </head> - <body> - <div class="container"> - <div class="page-header"> - <h1> - <img src="http://7viirv.com1.z0.glb.clouddn.com/seaweed50x50.png"></img> - SeaweedFS <small>{{ .Version }}</small> - </h1> - </div> - - <div class="row"> - <div class="col-sm-6"> - <h2>Cluster status</h2> - <table class="table"> - <tbody> - <tr> - <th>Free</th> - <td>{{ .Topology.Free }}</td> - </tr> - <tr> - <th>Max</th> - <td>{{ .Topology.Max }}</td> - </tr> - <tr> - <th>Leader</th> - <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> - </tr> - <tr> - <td class="col-sm-2 field-label"><label>Peers:</label></td> - <td class="col-sm-10"><ul class="list-unstyled"> - {{ range $k, $p := .Peers }} - <li><a href="{{ $p.ConnectionString }}">{{ $p.Name }}</a></li> - {{ end }} - </ul></td> - </tr> - </tbody> - </table> - </div> - - <div class="col-sm-6"> - <h2>System Stats</h2> - <table class="table table-condensed table-striped"> - <tr> - <th>Concurrent Connections</th> - <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> - </tr> - {{ range $key, $val := .Stats }} - <tr> - <th>{{ $key }}</th> - <td>{{ $val }}</td> - </tr> - {{ end }} - </table> - </div> - </div> - - <div class="row"> - <h2>Topology</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Data Center</th> - <th>Rack</th> - <th>RemoteAddr</th> - <th>#Volumes</th> - <th>Max</th> - </tr> - </thead> - <tbody> - {{ range $dc_index, $dc := .Topology.DataCenters }} - {{ range $rack_index, $rack := $dc.Racks }} - {{ range $dn_index, $dn := $rack.DataNodes }} - <tr> - <td><code>{{ $dc.Id }}</code></td> - <td>{{ $rack.Id }}</td> - <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td> - <td>{{ $dn.Volumes }}</td> - <td>{{ $dn.Max }}</td> - </tr> - {{ end }} - {{ end }} - {{ end }} - </tbody> - </table> - </div> - - </div> - </body> -</html> -`)) diff --git a/go/weed/weed_server/raft_server.go b/go/weed/weed_server/raft_server.go deleted file mode 100644 index bc0414679..000000000 --- a/go/weed/weed_server/raft_server.go +++ /dev/null @@ -1,217 +0,0 @@ -package weed_server - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "math/rand" - "net/http" - "net/url" - "os" - "path" - "reflect" - "sort" - "strings" - "time" - - "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/topology" - "github.com/gorilla/mux" -) - -type RaftServer struct { - peers []string // initial peers to join with - raftServer raft.Server - dataDir string - httpAddr string - router *mux.Router - topo *topology.Topology -} - -func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { - s := &RaftServer{ - peers: peers, - httpAddr: httpAddr, - dataDir: dataDir, - router: r, - topo: topo, - } - - if glog.V(4) { - raft.SetLogLevel(2) - } - - raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) - - var err error - transporter := raft.NewHTTPTransporter("/cluster", 0) - transporter.Transport.MaxIdleConnsPerHost = 1024 - glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) - - // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { - glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) - os.RemoveAll(path.Join(s.dataDir, "conf")) - os.RemoveAll(path.Join(s.dataDir, "log")) - os.RemoveAll(path.Join(s.dataDir, "snapshot")) - } - - s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "") - if err != nil { - glog.V(0).Infoln(err) - return nil - } - transporter.Install(s.raftServer, s) - s.raftServer.SetHeartbeatInterval(1 * time.Second) - s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 3450 * time.Millisecond) - s.raftServer.Start() - - s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST") - s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") - - if len(s.peers) > 0 { - // Join to leader if specified. - for { - glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) - time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) - firstJoinError := s.Join(s.peers) - if firstJoinError != nil { - glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - }) - if err != nil { - glog.V(0).Infoln(err) - } else { - break - } - } else { - break - } - } - } else if s.raftServer.IsLogEmpty() { - // Initialize the server by joining itself. - glog.V(0).Infoln("Initializing new cluster") - - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - }) - - if err != nil { - glog.V(0).Infoln(err) - return nil - } - - } else { - glog.V(0).Infoln("Old conf,log,snapshot should have been removed.") - } - - return s -} - -func (s *RaftServer) Peers() (members []string) { - peers := s.raftServer.Peers() - - for _, p := range peers { - members = append(members, strings.TrimPrefix(p.ConnectionString, "http://")) - } - - return -} - -func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { - confPath := path.Join(dir, "conf") - // open conf file - b, err := ioutil.ReadFile(confPath) - if err != nil { - return oldPeers, true - } - conf := &raft.Config{} - if err = json.Unmarshal(b, conf); err != nil { - return oldPeers, true - } - - for _, p := range conf.Peers { - oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://")) - } - oldPeers = append(oldPeers, self) - - sort.Strings(peers) - sort.Strings(oldPeers) - - return oldPeers, reflect.DeepEqual(peers, oldPeers) - -} - -// Join joins an existing cluster. -func (s *RaftServer) Join(peers []string) error { - command := &raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, - } - - var err error - var b bytes.Buffer - json.NewEncoder(&b).Encode(command) - for _, m := range peers { - if m == s.httpAddr { - continue - } - target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) - glog.V(0).Infoln("Attempting to connect to:", target) - - err = postFollowingOneRedirect(target, "application/json", &b) - - if err != nil { - glog.V(0).Infoln("Post returned error: ", err.Error()) - if _, ok := err.(*url.Error); ok { - // If we receive a network error try the next member - continue - } - } else { - return nil - } - } - - return errors.New("Could not connect to any cluster peers") -} - -// a workaround because http POST following redirection misses request body -func postFollowingOneRedirect(target string, contentType string, b *bytes.Buffer) error { - backupReader := bytes.NewReader(b.Bytes()) - resp, err := http.Post(target, contentType, b) - if err != nil { - return err - } - defer resp.Body.Close() - reply, _ := ioutil.ReadAll(resp.Body) - statusCode := resp.StatusCode - - if statusCode == http.StatusMovedPermanently { - var urlStr string - if urlStr = resp.Header.Get("Location"); urlStr == "" { - return fmt.Errorf("%d response missing Location header", resp.StatusCode) - } - - glog.V(0).Infoln("Post redirected to ", urlStr) - resp2, err2 := http.Post(urlStr, contentType, backupReader) - if err2 != nil { - return err2 - } - defer resp2.Body.Close() - reply, _ = ioutil.ReadAll(resp2.Body) - statusCode = resp2.StatusCode - } - - glog.V(0).Infoln("Post returned status: ", statusCode, string(reply)) - if statusCode != http.StatusOK { - return errors.New(string(reply)) - } - - return nil -} diff --git a/go/weed/weed_server/raft_server_handlers.go b/go/weed/weed_server/raft_server_handlers.go deleted file mode 100644 index b1d964a32..000000000 --- a/go/weed/weed_server/raft_server_handlers.go +++ /dev/null @@ -1,64 +0,0 @@ -package weed_server - -import ( - "encoding/json" - "io/ioutil" - "net/http" - "strings" - - "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" -) - -// Handles incoming RAFT joins. -func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { - glog.V(0).Infoln("Processing incoming join. Current Leader", s.raftServer.Leader(), "Self", s.raftServer.Name(), "Peers", s.raftServer.Peers()) - command := &raft.DefaultJoinCommand{} - - commandText, _ := ioutil.ReadAll(req.Body) - glog.V(0).Info("Command:", string(commandText)) - if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil { - glog.V(0).Infoln("Error decoding json message:", err, string(commandText)) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - glog.V(0).Infoln("join command from Name", command.Name, "Connection", command.ConnectionString) - - if _, err := s.raftServer.Do(command); err != nil { - switch err { - case raft.NotLeaderError: - s.redirectToLeader(w, req) - default: - glog.V(0).Infoln("Error processing join:", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } -} - -func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { - s.router.HandleFunc(pattern, handler) -} - -func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { - if leader, e := s.topo.Leader(); e == nil { - //http.StatusMovedPermanently does not cause http POST following redirection - glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+leader+req.URL.Path) - http.Redirect(w, req, "http://"+leader+req.URL.Path, http.StatusMovedPermanently) - } else { - glog.V(0).Infoln("Error: Leader Unknown") - http.Error(w, "Leader unknown", http.StatusInternalServerError) - } -} - -func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { - ret := operation.ClusterStatusResult{ - IsLeader: s.topo.IsLeader(), - Peers: s.Peers(), - } - if leader, e := s.topo.Leader(); e == nil { - ret.Leader = leader - } - writeJsonQuiet(w, r, http.StatusOK, ret) -} diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go deleted file mode 100644 index 229d66cb9..000000000 --- a/go/weed/weed_server/volume_server.go +++ /dev/null @@ -1,125 +0,0 @@ -package weed_server - -import ( - "math/rand" - "net/http" - "sync" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/security" - "github.com/chrislusf/seaweedfs/go/storage" -) - -type VolumeServer struct { - masterNode string - mnLock sync.RWMutex - pulseSeconds int - dataCenter string - rack string - store *storage.Store - guard *security.Guard - - needleMapKind storage.NeedleMapType - FixJpgOrientation bool - ReadRedirect bool -} - -func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, - port int, publicUrl string, - folders []string, maxCounts []int, - needleMapKind storage.NeedleMapType, - masterNode string, pulseSeconds int, - dataCenter string, rack string, - whiteList []string, - fixJpgOrientation bool, - readRedirect bool) *VolumeServer { - vs := &VolumeServer{ - pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, - needleMapKind: needleMapKind, - FixJpgOrientation: fixJpgOrientation, - ReadRedirect: readRedirect, - } - vs.SetMasterNode(masterNode) - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - - vs.guard = security.NewGuard(whiteList, "") - - adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) - adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler)) - adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) - adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) - adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) - adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) - adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) - adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) - adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) - adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) - adminMux.HandleFunc("/delete", vs.guard.WhiteList(vs.batchDeleteHandler)) - adminMux.HandleFunc("/", vs.privateStoreHandler) - if publicMux != adminMux { - // separated admin and public port - publicMux.HandleFunc("/favicon.ico", vs.faviconHandler) - publicMux.HandleFunc("/", vs.publicReadOnlyHandler) - } - - go func() { - connected := true - - glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) - vs.store.SetBootstrapMaster(vs.GetMasterNode()) - vs.store.SetDataCenter(vs.dataCenter) - vs.store.SetRack(vs.rack) - for { - glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode()) - master, secretKey, err := vs.store.SendHeartbeatToMaster() - if err == nil { - if !connected { - connected = true - vs.SetMasterNode(master) - vs.guard.SecretKey = secretKey - glog.V(0).Infoln("Volume Server Connected with master at", master) - } - } else { - glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err) - if connected { - connected = false - } - } - if connected { - time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) - } else { - time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond) - } - } - }() - - return vs -} - -func (vs *VolumeServer) GetMasterNode() string { - vs.mnLock.RLock() - defer vs.mnLock.RUnlock() - return vs.masterNode -} - -func (vs *VolumeServer) SetMasterNode(masterNode string) { - vs.mnLock.Lock() - defer vs.mnLock.Unlock() - vs.masterNode = masterNode -} - -func (vs *VolumeServer) Shutdown() { - glog.V(0).Infoln("Shutting down volume server...") - vs.store.Close() - glog.V(0).Infoln("Shut down successfully!") -} - -func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(vs.guard.SecretKey, fileId) -} diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go deleted file mode 100644 index accc280cd..000000000 --- a/go/weed/weed_server/volume_server_handlers.go +++ /dev/null @@ -1,57 +0,0 @@ -package weed_server - -import ( - "net/http" - - "github.com/chrislusf/seaweedfs/go/stats" -) - -/* - -If volume server is started with a separated public port, the public port will -be more "secure". - -Public port currently only supports reads. - -Later writes on public port can have one of the 3 -security settings: -1. not secured -2. secured by white list -3. secured by JWT(Json Web Token) - -*/ - -func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - stats.ReadRequest() - vs.GetOrHeadHandler(w, r) - case "HEAD": - stats.ReadRequest() - vs.GetOrHeadHandler(w, r) - case "DELETE": - stats.DeleteRequest() - vs.guard.WhiteList(vs.DeleteHandler)(w, r) - case "PUT": - stats.WriteRequest() - vs.guard.WhiteList(vs.PostHandler)(w, r) - case "POST": - stats.WriteRequest() - vs.guard.WhiteList(vs.PostHandler)(w, r) - } -} - -func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - stats.ReadRequest() - vs.GetOrHeadHandler(w, r) - case "HEAD": - stats.ReadRequest() - vs.GetOrHeadHandler(w, r) - } -} - -func (vs *VolumeServer) faviconHandler(w http.ResponseWriter, r *http.Request) { - vs.FaviconHandler(w, r) -} diff --git a/go/weed/weed_server/volume_server_handlers_admin.go b/go/weed/weed_server/volume_server_handlers_admin.go deleted file mode 100644 index 80aeb3f1d..000000000 --- a/go/weed/weed_server/volume_server_handlers_admin.go +++ /dev/null @@ -1,50 +0,0 @@ -package weed_server - -import ( - "net/http" - "path/filepath" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/stats" - "github.com/chrislusf/seaweedfs/go/util" -) - -func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Volumes"] = vs.store.Status() - writeJsonQuiet(w, r, http.StatusOK, m) -} - -func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) - if err == nil { - writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) - } else { - writeJsonError(w, r, http.StatusNotAcceptable, err) - } - glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err) -} - -func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.DeleteCollection(r.FormValue("collection")) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) - } else { - writeJsonError(w, r, http.StatusInternalServerError, err) - } - glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) -} - -func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - var ds []*stats.DiskStatus - for _, loc := range vs.store.Locations { - if dir, e := filepath.Abs(loc.Directory); e == nil { - ds = append(ds, stats.NewDiskStatus(dir)) - } - } - m["DiskStatuses"] = ds - writeJsonQuiet(w, r, http.StatusOK, m) -} diff --git a/go/weed/weed_server/volume_server_handlers_helper.go b/go/weed/weed_server/volume_server_handlers_helper.go deleted file mode 100644 index 2bab35e45..000000000 --- a/go/weed/weed_server/volume_server_handlers_helper.go +++ /dev/null @@ -1,115 +0,0 @@ -package weed_server - -import ( - "errors" - "fmt" - "mime/multipart" - "net/textproto" - "strconv" - "strings" -) - -// copied from src/pkg/net/http/fs.go - -// httpRange specifies the byte range to be sent to the client. -type httpRange struct { - start, length int64 -} - -func (r httpRange) contentRange(size int64) string { - return fmt.Sprintf("bytes %d-%d/%d", r.start, r.start+r.length-1, size) -} - -func (r httpRange) mimeHeader(contentType string, size int64) textproto.MIMEHeader { - return textproto.MIMEHeader{ - "Content-Range": {r.contentRange(size)}, - "Content-Type": {contentType}, - } -} - -// parseRange parses a Range header string as per RFC 2616. -func parseRange(s string, size int64) ([]httpRange, error) { - if s == "" { - return nil, nil // header not present - } - const b = "bytes=" - if !strings.HasPrefix(s, b) { - return nil, errors.New("invalid range") - } - var ranges []httpRange - for _, ra := range strings.Split(s[len(b):], ",") { - ra = strings.TrimSpace(ra) - if ra == "" { - continue - } - i := strings.Index(ra, "-") - if i < 0 { - return nil, errors.New("invalid range") - } - start, end := strings.TrimSpace(ra[:i]), strings.TrimSpace(ra[i+1:]) - var r httpRange - if start == "" { - // If no start is specified, end specifies the - // range start relative to the end of the file. - i, err := strconv.ParseInt(end, 10, 64) - if err != nil { - return nil, errors.New("invalid range") - } - if i > size { - i = size - } - r.start = size - i - r.length = size - r.start - } else { - i, err := strconv.ParseInt(start, 10, 64) - if err != nil || i > size || i < 0 { - return nil, errors.New("invalid range") - } - r.start = i - if end == "" { - // If no end is specified, range extends to end of the file. - r.length = size - r.start - } else { - i, err := strconv.ParseInt(end, 10, 64) - if err != nil || r.start > i { - return nil, errors.New("invalid range") - } - if i >= size { - i = size - 1 - } - r.length = i - r.start + 1 - } - } - ranges = append(ranges, r) - } - return ranges, nil -} - -// countingWriter counts how many bytes have been written to it. -type countingWriter int64 - -func (w *countingWriter) Write(p []byte) (n int, err error) { - *w += countingWriter(len(p)) - return len(p), nil -} - -// rangesMIMESize returns the number of bytes it takes to encode the -// provided ranges as a multipart response. -func rangesMIMESize(ranges []httpRange, contentType string, contentSize int64) (encSize int64) { - var w countingWriter - mw := multipart.NewWriter(&w) - for _, ra := range ranges { - mw.CreatePart(ra.mimeHeader(contentType, contentSize)) - encSize += ra.length - } - mw.Close() - encSize += int64(w) - return -} - -func sumRangesSize(ranges []httpRange) (size int64) { - for _, ra := range ranges { - size += ra.length - } - return -} diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go deleted file mode 100644 index 3eb33a8c9..000000000 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ /dev/null @@ -1,301 +0,0 @@ -package weed_server - -import ( - "bytes" - "io" - "mime" - "mime/multipart" - "net/http" - "path" - "strconv" - "strings" - "time" - - "net/url" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/images" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") - -func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - glog.V(2).Infoln("parsing error:", err, r.URL.Path) - w.WriteHeader(http.StatusBadRequest) - return - } - err = n.ParsePath(fid) - if err != nil { - glog.V(2).Infoln("parsing fid error:", err, r.URL.Path) - w.WriteHeader(http.StatusBadRequest) - return - } - - glog.V(4).Infoln("volume", volumeId, "reading", n) - if !vs.store.HasVolume(volumeId) { - if !vs.ReadRedirect { - glog.V(2).Infoln("volume is not local:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) - glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil && len(lookupResult.Locations) > 0 { - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) - u.Path = r.URL.Path - arg := url.Values{} - if c := r.FormValue("collection"); c != "" { - arg.Set("collection", c) - } - u.RawQuery = arg.Encode() - http.Redirect(w, r, u.String(), http.StatusMovedPermanently) - - } else { - glog.V(2).Infoln("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - } - return - } - cookie := n.Cookie - count, e := vs.store.ReadVolumeNeedle(volumeId, n) - glog.V(4).Infoln("read bytes", count, "error", e) - if e != nil || count <= 0 { - glog.V(0).Infoln("read error:", e, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - defer n.ReleaseMemory() - if n.Cookie != cookie { - glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(http.StatusNotFound) - return - } - if n.LastModified != 0 { - w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) - if r.Header.Get("If-Modified-Since") != "" { - if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { - if t.Unix() >= int64(n.LastModified) { - w.WriteHeader(http.StatusNotModified) - return - } - } - } - } - etag := n.Etag() - if inm := r.Header.Get("If-None-Match"); inm == etag { - w.WriteHeader(http.StatusNotModified) - return - } - w.Header().Set("Etag", etag) - - if vs.tryHandleChunkedFile(n, filename, w, r) { - return - } - - if n.NameSize > 0 && filename == "" { - filename = string(n.Name) - if ext == "" { - ext = path.Ext(filename) - } - } - mtype := "" - if n.MimeSize > 0 { - mt := string(n.Mime) - if !strings.HasPrefix(mt, "application/octet-stream") { - mtype = mt - } - } - - if ext != ".gz" { - if n.IsGzipped() { - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - w.Header().Set("Content-Encoding", "gzip") - } else { - if n.Data, err = operation.UnGzipData(n.Data); err != nil { - glog.V(0).Infoln("ungzip error:", err, r.URL.Path) - } - } - } - } - if ext == ".png" || ext == ".jpg" || ext == ".gif" { - width, height := 0, 0 - if r.FormValue("width") != "" { - width, _ = strconv.Atoi(r.FormValue("width")) - } - if r.FormValue("height") != "" { - height, _ = strconv.Atoi(r.FormValue("height")) - } - n.Data, _, _ = images.Resized(ext, n.Data, width, height) - } - - if e := writeResponseContent(filename, mtype, bytes.NewReader(n.Data), w, r); e != nil { - glog.V(2).Infoln("response write error:", e) - } -} - -func (vs *VolumeServer) FaviconHandler(w http.ResponseWriter, r *http.Request) { - data, err := images.Asset("favicon/favicon.ico") - if err != nil { - glog.V(2).Infoln("favicon read error:", err) - return - } - - if e := writeResponseContent("favicon.ico", "image/x-icon", bytes.NewReader(data), w, r); e != nil { - glog.V(2).Infoln("response write error:", e) - } -} - -func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { - if !n.IsChunkedManifest() { - return false - } - - chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) - if e != nil { - glog.V(0).Infof("load chunked manifest (%s) error: %v", r.URL.Path, e) - return false - } - if fileName == "" && chunkManifest.Name != "" { - fileName = chunkManifest.Name - } - mType := "" - if chunkManifest.Mime != "" { - mt := chunkManifest.Mime - if !strings.HasPrefix(mt, "application/octet-stream") { - mType = mt - } - } - - w.Header().Set("X-File-Store", "chunked") - - chunkedFileReader := &operation.ChunkedFileReader{ - Manifest: chunkManifest, - Master: vs.GetMasterNode(), - } - defer chunkedFileReader.Close() - if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil { - glog.V(2).Infoln("response write error:", e) - } - return true -} - -func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error { - totalSize, e := rs.Seek(0, 2) - if mimeType == "" { - if ext := path.Ext(filename); ext != "" { - mimeType = mime.TypeByExtension(ext) - } - } - if mimeType != "" { - w.Header().Set("Content-Type", mimeType) - } - if filename != "" { - contentDisposition := "inline" - if r.FormValue("dl") != "" { - if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl { - contentDisposition = "attachment" - } - } - w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`) - } - w.Header().Set("Accept-Ranges", "bytes") - if r.Method == "HEAD" { - w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - return nil - } - rangeReq := r.Header.Get("Range") - if rangeReq == "" { - w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if _, e = rs.Seek(0, 0); e != nil { - return e - } - _, e = io.Copy(w, rs) - return e - } - - //the rest is dealing with partial content request - //mostly copy from src/pkg/net/http/fs.go - ranges, err := parseRange(rangeReq, totalSize) - if err != nil { - http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) - return nil - } - if sumRangesSize(ranges) > totalSize { - // The total number of bytes in all the ranges - // is larger than the size of the file by - // itself, so this is probably an attack, or a - // dumb client. Ignore the range request. - return nil - } - if len(ranges) == 0 { - return nil - } - if len(ranges) == 1 { - // RFC 2616, Section 14.16: - // "When an HTTP message includes the content of a single - // range (for example, a response to a request for a - // single range, or to a request for a set of ranges - // that overlap without any holes), this content is - // transmitted with a Content-Range header, and a - // Content-Length header showing the number of bytes - // actually transferred. - // ... - // A response to a request for a single range MUST NOT - // be sent using the multipart/byteranges media type." - ra := ranges[0] - w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) - w.Header().Set("Content-Range", ra.contentRange(totalSize)) - w.WriteHeader(http.StatusPartialContent) - if _, e = rs.Seek(ra.start, 0); e != nil { - return e - } - - _, e = io.CopyN(w, rs, ra.length) - return e - } - // process multiple ranges - for _, ra := range ranges { - if ra.start > totalSize { - http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) - return nil - } - } - sendSize := rangesMIMESize(ranges, mimeType, totalSize) - pr, pw := io.Pipe() - mw := multipart.NewWriter(pw) - w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) - sendContent := pr - defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. - go func() { - for _, ra := range ranges { - part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) - if e != nil { - pw.CloseWithError(e) - return - } - if _, e = rs.Seek(ra.start, 0); e != nil { - pw.CloseWithError(e) - return - } - if _, e = io.CopyN(part, rs, ra.length); e != nil { - pw.CloseWithError(e) - return - } - } - mw.Close() - pw.Close() - }() - if w.Header().Get("Content-Encoding") == "" { - w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) - } - w.WriteHeader(http.StatusPartialContent) - _, e = io.CopyN(w, sendContent, sendSize) - return e -} diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go deleted file mode 100644 index c52c93bd2..000000000 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ /dev/null @@ -1,87 +0,0 @@ -package weed_server - -import ( - "fmt" - "net/http" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { - v, err := vs.getVolume("volume", r) - if v == nil { - writeJsonError(w, r, http.StatusBadRequest, err) - return - } - syncStat := v.GetVolumeSyncStatus() - if syncStat.Error != "" { - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error)) - glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err) - } else { - writeJsonQuiet(w, r, http.StatusOK, syncStat) - } -} - -func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) { - v, err := vs.getVolume("volume", r) - if v == nil { - writeJsonError(w, r, http.StatusBadRequest, err) - return - } - content, err := v.IndexFileContent() - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - w.Write(content) -} - -func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) { - v, err := vs.getVolume("volume", r) - if v == nil { - writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err)) - return - } - if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) { - writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision)) - return - } - offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) - size := uint32(util.ParseUint64(r.FormValue("size"), 0)) - content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) - defer storage.ReleaseBytes(block.Bytes) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - - id := util.ParseUint64(r.FormValue("id"), 0) - n := new(storage.Needle) - n.ParseNeedleHeader(content) - if id != n.Id { - writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)) - return - } - - w.Write(content) -} - -func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { - volumeIdString := r.FormValue(volumeParameterName) - if volumeIdString == "" { - err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) - return nil, err - } - vid, err := storage.NewVolumeId(volumeIdString) - if err != nil { - err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) - return nil, err - } - v := vs.store.GetVolume(vid) - if v == nil { - return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) - } - return v, nil -} diff --git a/go/weed/weed_server/volume_server_handlers_ui.go b/go/weed/weed_server/volume_server_handlers_ui.go deleted file mode 100644 index 5925b5a88..000000000 --- a/go/weed/weed_server/volume_server_handlers_ui.go +++ /dev/null @@ -1,38 +0,0 @@ -package weed_server - -import ( - "net/http" - "path/filepath" - "time" - - "github.com/chrislusf/seaweedfs/go/stats" - "github.com/chrislusf/seaweedfs/go/util" - ui "github.com/chrislusf/seaweedfs/go/weed/weed_server/volume_server_ui" -) - -func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { - infos := make(map[string]interface{}) - infos["Up Time"] = time.Now().Sub(startTime).String() - var ds []*stats.DiskStatus - for _, loc := range vs.store.Locations { - if dir, e := filepath.Abs(loc.Directory); e == nil { - ds = append(ds, stats.NewDiskStatus(dir)) - } - } - args := struct { - Version string - Master string - Volumes interface{} - DiskStatuses interface{} - Stats interface{} - Counters *stats.ServerStats - }{ - util.VERSION, - vs.masterNode, - vs.store.Status(), - ds, - infos, - serverStats, - } - ui.StatusTpl.Execute(w, args) -} diff --git a/go/weed/weed_server/volume_server_handlers_vacuum.go b/go/weed/weed_server/volume_server_handlers_vacuum.go deleted file mode 100644 index e174835ca..000000000 --- a/go/weed/weed_server/volume_server_handlers_vacuum.go +++ /dev/null @@ -1,35 +0,0 @@ -package weed_server - -import ( - "net/http" - - "github.com/chrislusf/seaweedfs/go/glog" -) - -func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { - err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"error": "", "result": ret}) - } else { - writeJsonQuiet(w, r, http.StatusInternalServerError, map[string]interface{}{"error": err.Error(), "result": false}) - } - glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) -} -func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.CompactVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) - } else { - writeJsonError(w, r, http.StatusInternalServerError, err) - } - glog.V(2).Infoln("compacted volume =", r.FormValue("volume"), ", error =", err) -} -func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { - err := vs.store.CommitCompactVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) - } else { - writeJsonError(w, r, http.StatusInternalServerError, err) - } - glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err) -} diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go deleted file mode 100644 index b0a4c7031..000000000 --- a/go/weed/weed_server/volume_server_handlers_write.go +++ /dev/null @@ -1,165 +0,0 @@ -package weed_server - -import ( - "errors" - "fmt" - "net/http" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/topology" -) - -func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { - if e := r.ParseForm(); e != nil { - glog.V(0).Infoln("form parse error:", e) - writeJsonError(w, r, http.StatusBadRequest, e) - return - } - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, ve := storage.NewVolumeId(vid) - if ve != nil { - glog.V(0).Infoln("NewVolumeId error:", ve) - writeJsonError(w, r, http.StatusBadRequest, ve) - return - } - needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation) - if ne != nil { - writeJsonError(w, r, http.StatusBadRequest, ne) - return - } - - ret := operation.UploadResult{} - size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), - vs.store, volumeId, needle, r) - httpStatus := http.StatusCreated - if errorStatus != "" { - httpStatus = http.StatusInternalServerError - ret.Error = errorStatus - } - if needle.HasName() { - ret.Name = string(needle.Name) - } - ret.Size = size - writeJsonQuiet(w, r, httpStatus, ret) -} - -func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(fid) - - glog.V(2).Infoln("deleting", n) - - cookie := n.Cookie - - _, ok := vs.store.ReadVolumeNeedle(volumeId, n) - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJsonQuiet(w, r, http.StatusNotFound, m) - return - } - defer n.ReleaseMemory() - - if n.Cookie != cookie { - glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - writeJsonError(w, r, http.StatusBadRequest, errors.New("File Random Cookie does not match.")) - return - } - - count := int64(n.Size) - - if n.IsChunkedManifest() { - chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) - if e != nil { - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e)) - return - } - // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMasterNode()); e != nil { - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) - return - } - count = chunkManifest.Size - } - - _, err := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r) - - if err == nil { - m := make(map[string]int64) - m["size"] = count - writeJsonQuiet(w, r, http.StatusAccepted, m) - } else { - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err)) - } - -} - -//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas. -func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - var ret []operation.DeleteResult - for _, fid := range r.Form["fid"] { - vid, id_cookie, err := operation.ParseFileId(fid) - if err != nil { - ret = append(ret, operation.DeleteResult{ - Fid: fid, - Status: http.StatusBadRequest, - Error: err.Error()}) - continue - } - n := new(storage.Needle) - volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(id_cookie) - glog.V(4).Infoln("batch deleting", n) - cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { - ret = append(ret, operation.DeleteResult{ - Fid: fid, - Status: http.StatusNotFound, - Error: err.Error(), - }) - continue - } - - if n.IsChunkedManifest() { - ret = append(ret, operation.DeleteResult{ - Fid: fid, - Status: http.StatusNotAcceptable, - Error: "ChunkManifest: not allowed in batch delete mode.", - }) - n.ReleaseMemory() - continue - } - - if n.Cookie != cookie { - ret = append(ret, operation.DeleteResult{ - Fid: fid, - Status: http.StatusBadRequest, - Error: "File Random Cookie does not match.", - }) - glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - n.ReleaseMemory() - return - } - if size, err := vs.store.Delete(volumeId, n); err != nil { - ret = append(ret, operation.DeleteResult{ - Fid: fid, - Status: http.StatusInternalServerError, - Error: err.Error()}, - ) - } else { - ret = append(ret, operation.DeleteResult{ - Fid: fid, - Status: http.StatusAccepted, - Size: int(size)}, - ) - } - n.ReleaseMemory() - } - - writeJsonQuiet(w, r, http.StatusAccepted, ret) -} diff --git a/go/weed/weed_server/volume_server_ui/templates.go b/go/weed/weed_server/volume_server_ui/templates.go deleted file mode 100644 index c3db6e92a..000000000 --- a/go/weed/weed_server/volume_server_ui/templates.go +++ /dev/null @@ -1,135 +0,0 @@ -package master_ui - -import ( - "html/template" - "strconv" - "strings" -) - -func join(data []int64) string { - var ret []string - for _, d := range data { - ret = append(ret, strconv.Itoa(int(d))) - } - return strings.Join(ret, ",") -} - -var funcMap = template.FuncMap{ - "join": join, -} - -var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> -<html> - <head> - <title>SeaweedFS {{ .Version }}</title> - <link rel="icon" href="http://7viirv.com1.z0.glb.clouddn.com/seaweed_favicon.png" sizes="32x32" /> - <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css"> - <script type="text/javascript" src="https://code.jquery.com/jquery-2.1.3.min.js"></script> - <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script> - <script type="text/javascript"> - $(function() { - var periods = ['second', 'minute', 'hour', 'day']; - for (i = 0; i < periods.length; i++) { - var period = periods[i]; - $('.inlinesparkline-'+period).sparkline('html', { - type: 'line', - barColor: 'red', - tooltipSuffix:' request per '+period, - }); - } - }); - </script> - <style> - #jqstooltip{ - height: 28px !important; - width: 150px !important; - } - </style> - </head> - <body> - <div class="container"> - <div class="page-header"> - <h1> - <img src="http://7viirv.com1.z0.glb.clouddn.com/seaweed50x50.png"></img> - SeaweedFS <small>{{ .Version }}</small> - </h1> - </div> - - <div class="row"> - <div class="col-sm-6"> - <h2>Disk Stats</h2> - <table class="table table-condensed table-striped"> - {{ range .DiskStatuses }} - <tr> - <th>{{ .Dir }}</th> - <td>{{ .Free }} Bytes Free</td> - </tr> - {{ end }} - </table> - </div> - - <div class="col-sm-6"> - <h2>System Stats</h2> - <table class="table table-condensed table-striped"> - <tr> - <th>Master</th> - <td><a href="http://{{.Master}}/ui/index.html">{{.Master}}</a></td> - </tr> - <tr> - <th>Weekly # ReadRequests</th> - <td><span class="inlinesparkline-day">{{ .Counters.ReadRequests.WeekCounter.ToList | join }}</span></td> - </tr> - <tr> - <th>Daily # ReadRequests</th> - <td><span class="inlinesparkline-hour">{{ .Counters.ReadRequests.DayCounter.ToList | join }}</span></td> - </tr> - <tr> - <th>Hourly # ReadRequests</th> - <td><span class="inlinesparkline-minute">{{ .Counters.ReadRequests.HourCounter.ToList | join }}</span></td> - </tr> - <tr> - <th>Last Minute # ReadRequests</th> - <td><span class="inlinesparkline-second">{{ .Counters.ReadRequests.MinuteCounter.ToList | join }}</span></td> - </tr> - {{ range $key, $val := .Stats }} - <tr> - <th>{{ $key }}</th> - <td>{{ $val }}</td> - </tr> - {{ end }} - </table> - </div> - </div> - - <div class="row"> - <h2>Volumes</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Id</th> - <th>Collection</th> - <th>Size</th> - <th>Files</th> - <th>Trash</th> - <th>TTL</th> - </tr> - </thead> - <tbody> - {{ range .Volumes }} - <tr> - <td><code>{{ .Id }}</code></td> - <td>{{ .Collection }}</td> - <td>{{ .Size }} Bytes</td> - <td>{{ .FileCount }}</td> - <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td> - <td>{{ .Ttl }}</td> - </tr> - {{ end }} - </tbody> - </table> - </div> - - </div> - </body> -</html> -`)) |
