diff options
Diffstat (limited to 'weed/server')
21 files changed, 2489 insertions, 0 deletions
diff --git a/weed/server/common.go b/weed/server/common.go new file mode 100644 index 000000000..312bcea14 --- /dev/null +++ b/weed/server/common.go @@ -0,0 +1,179 @@ +package weed_server + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var serverStats *stats.ServerStats +var startTime = time.Now() + +func init() { + serverStats = stats.NewServerStats() + go serverStats.Start() + +} + +func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { + var bytes []byte + if r.FormValue("pretty") != "" { + bytes, err = json.MarshalIndent(obj, "", " ") + } else { + bytes, err = json.Marshal(obj) + } + if err != nil { + return + } + callback := r.FormValue("callback") + if callback == "" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(httpStatus) + _, err = w.Write(bytes) + } else { + w.Header().Set("Content-Type", "application/javascript") + w.WriteHeader(httpStatus) + if _, err = w.Write([]uint8(callback)); err != nil { + return + } + if _, err = w.Write([]uint8("(")); err != nil { + return + } + fmt.Fprint(w, string(bytes)) + if _, err = w.Write([]uint8(")")); err != nil { + return + } + } + + return +} + +// wrapper for writeJson - just logs errors +func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) { + if err := writeJson(w, r, httpStatus, obj); err != nil { + glog.V(0).Infof("error writing JSON %s: %v", obj, err) + } +} +func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) { + m := make(map[string]interface{}) + m["error"] = err.Error() + writeJsonQuiet(w, r, httpStatus, m) +} + +func debug(params ...interface{}) { + glog.V(4).Infoln(params) +} + +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { + jwt := security.GetJwt(r) + m := make(map[string]interface{}) + if r.Method != "POST" { + writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) + return + } + + debug("parsing upload file...") + fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r) + if pe != nil { + writeJsonError(w, r, http.StatusBadRequest, pe) + return + } + + debug("assigning file id for", fname) + r.ParseForm() + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication"), r.FormValue("collection"), r.FormValue("ttl")) + if ae != nil { + writeJsonError(w, r, http.StatusInternalServerError, ae) + return + } + + url := "http://" + assignResult.Url + "/" + assignResult.Fid + if lastModified != 0 { + url = url + "?ts=" + strconv.FormatUint(lastModified, 10) + } + + debug("upload file to store", url) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + m["fileName"] = fname + m["fid"] = assignResult.Fid + m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid + m["size"] = uploadResult.Size + writeJsonQuiet(w, r, http.StatusCreated, m) + return +} + +func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { + r.ParseForm() + fids := r.Form["fid"] + ret, err := operation.DeleteFiles(masterUrl, fids) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + writeJsonQuiet(w, r, http.StatusAccepted, ret) +} + +func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { + switch strings.Count(path, "/") { + case 3: + parts := strings.Split(path, "/") + vid, fid, filename = parts[1], parts[2], parts[3] + ext = filepath.Ext(filename) + case 2: + parts := strings.Split(path, "/") + vid, fid = parts[1], parts[2] + dotIndex := strings.LastIndex(fid, ".") + if dotIndex > 0 { + ext = fid[dotIndex:] + fid = fid[0:dotIndex] + } + default: + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + vid, isVolumeIdOnly = path[sepIndex+1:], true + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + } + return +} + +func statsCounterHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Counters"] = serverStats + writeJsonQuiet(w, r, http.StatusOK, m) +} + +func statsMemoryHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Memory"] = stats.MemStat() + writeJsonQuiet(w, r, http.StatusOK, m) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go new file mode 100644 index 000000000..ee7eaf886 --- /dev/null +++ b/weed/server/filer_server.go @@ -0,0 +1,67 @@ +package weed_server + +import ( + "net/http" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store" + "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer" + "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace" + "github.com/chrislusf/seaweedfs/weed/filer/redis_store" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" +) + +type FilerServer struct { + port string + master string + collection string + defaultReplication string + redirectOnRead bool + disableDirListing bool + secret security.Secret + filer filer.Filer +} + +func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir string, collection string, + replication string, redirectOnRead bool, disableDirListing bool, + secret string, + cassandra_server string, cassandra_keyspace string, + redis_server string, redis_password string, redis_database int, +) (fs *FilerServer, err error) { + fs = &FilerServer{ + master: master, + collection: collection, + defaultReplication: replication, + redirectOnRead: redirectOnRead, + disableDirListing: disableDirListing, + port: ip + ":" + strconv.Itoa(port), + } + + if cassandra_server != "" { + cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server) + if err != nil { + glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err) + } + fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store) + } else if redis_server != "" { + redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database) + fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store) + } else { + if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil { + glog.Fatalf("Can not start filer in dir %s : %v", dir, err) + return + } + + r.HandleFunc("/admin/mv", fs.moveHandler) + } + + r.HandleFunc("/", fs.filerHandler) + + return fs, nil +} + +func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { + return security.GenJwt(fs.secret, fileId) +} diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go new file mode 100644 index 000000000..d6b98976b --- /dev/null +++ b/weed/server/filer_server_handlers.go @@ -0,0 +1,265 @@ +package weed_server + +import ( + "bytes" + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/syndtr/goleveldb/leveldb" +) + +func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + fs.GetOrHeadHandler(w, r, true) + case "HEAD": + fs.GetOrHeadHandler(w, r, false) + case "DELETE": + fs.DeleteHandler(w, r) + case "PUT": + fs.PostHandler(w, r) + case "POST": + fs.PostHandler(w, r) + } +} + +// listDirectoryHandler lists directories and folers under a directory +// files are sorted by name and paginated via "lastFileName" and "limit". +// sub directories are listed on the first page, when "lastFileName" +// is empty. +func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, "/") { + return + } + dirlist, err := fs.filer.ListDirectories(r.URL.Path) + if err == leveldb.ErrNotFound { + glog.V(3).Infoln("Directory Not Found in db", r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + m := make(map[string]interface{}) + m["Directory"] = r.URL.Path + lastFileName := r.FormValue("lastFileName") + if lastFileName == "" { + m["Subdirectories"] = dirlist + } + limit, limit_err := strconv.Atoi(r.FormValue("limit")) + if limit_err != nil { + limit = 100 + } + m["Files"], _ = fs.filer.ListFiles(r.URL.Path, lastFileName, limit) + writeJsonQuiet(w, r, http.StatusOK, m) +} + +func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { + if strings.HasSuffix(r.URL.Path, "/") { + if fs.disableDirListing { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + fs.listDirectoryHandler(w, r) + return + } + + fileId, err := fs.filer.FindFile(r.URL.Path) + if err == leveldb.ErrNotFound { + glog.V(3).Infoln("Not found in db", r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + + urlLocation, err := operation.LookupFileId(fs.master, fileId) + if err != nil { + glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error()) + w.WriteHeader(http.StatusNotFound) + return + } + urlString := urlLocation + if fs.redirectOnRead { + http.Redirect(w, r, urlString, http.StatusFound) + return + } + u, _ := url.Parse(urlString) + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, + } + glog.V(3).Infoln("retrieving from", u) + resp, do_err := util.Do(request) + if do_err != nil { + glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, do_err) + return + } + defer resp.Body.Close() + for k, v := range resp.Header { + w.Header()[k] = v + } + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) +} + +type analogueReader struct { + *bytes.Buffer +} + +// So that it implements the io.ReadCloser interface +func (m analogueReader) Close() error { return nil } + +func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + replication := query.Get("replication") + if replication == "" { + replication = fs.defaultReplication + } + collection := query.Get("collection") + if collection == "" { + collection = fs.collection + } + + var fileId string + var err error + var urlLocation string + if r.Method == "PUT" { + buf, _ := ioutil.ReadAll(r.Body) + r.Body = analogueReader{bytes.NewBuffer(buf)} + fileName, _, _, _, _, _, _, pe := storage.ParseUpload(r) + if pe != nil { + glog.V(0).Infoln("failing to parse post body", pe.Error()) + writeJsonError(w, r, http.StatusInternalServerError, pe) + return + } + //reconstruct http request body for following new request to volume server + r.Body = analogueReader{bytes.NewBuffer(buf)} + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if fileName != "" { + path += fileName + } + } + + if fileId, err = fs.filer.FindFile(path); err != nil && err != leveldb.ErrNotFound { + glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } else if fileId != "" && err == nil { + var le error + urlLocation, le = operation.LookupFileId(fs.master, fileId) + if le != nil { + glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, le.Error()) + w.WriteHeader(http.StatusNotFound) + return + } + } + } else { + assignResult, ae := operation.Assign(fs.master, 1, replication, collection, query.Get("ttl")) + if ae != nil { + glog.V(0).Infoln("failing to assign a file id", ae.Error()) + writeJsonError(w, r, http.StatusInternalServerError, ae) + return + } + fileId = assignResult.Fid + urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + } + + u, _ := url.Parse(urlLocation) + glog.V(4).Infoln("post to", u) + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, + } + resp, do_err := util.Do(request) + if do_err != nil { + glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, do_err) + return + } + defer resp.Body.Close() + resp_body, ra_err := ioutil.ReadAll(resp.Body) + if ra_err != nil { + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, ra_err) + return + } + glog.V(4).Infoln("post result", string(resp_body)) + var ret operation.UploadResult + unmarshal_err := json.Unmarshal(resp_body, &ret) + if unmarshal_err != nil { + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) + writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err) + return + } + if ret.Error != "" { + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) + writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) + return + } + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if ret.Name != "" { + path += ret.Name + } else { + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up + glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") + writeJsonError(w, r, http.StatusInternalServerError, + errors.New("Can not to write to folder "+path+" without a file name")) + return + } + } + glog.V(4).Infoln("saving", path, "=>", fileId) + if db_err := fs.filer.CreateFile(path, fileId); db_err != nil { + operation.DeleteFile(fs.master, fileId, fs.jwt(fileId)) //clean up + glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + writeJsonError(w, r, http.StatusInternalServerError, db_err) + return + } + w.WriteHeader(http.StatusCreated) + w.Write(resp_body) +} + +// curl -X DELETE http://localhost:8888/path/to +// curl -X DELETE http://localhost:8888/path/to?recursive=true +func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { + var err error + var fid string + if strings.HasSuffix(r.URL.Path, "/") { + isRecursive := r.FormValue("recursive") == "true" + err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive) + } else { + fid, err = fs.filer.DeleteFile(r.URL.Path) + if err == nil && fid != "" { + err = operation.DeleteFile(fs.master, fid, fs.jwt(fid)) + } + } + if err == nil { + writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) + } else { + glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) + } +} diff --git a/weed/server/filer_server_handlers_admin.go b/weed/server/filer_server_handlers_admin.go new file mode 100644 index 000000000..979ad517b --- /dev/null +++ b/weed/server/filer_server_handlers_admin.go @@ -0,0 +1,29 @@ +package weed_server + +import ( + "net/http" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +/* +Move a folder or a file, with 4 Use cases: + mv fromDir toNewDir + mv fromDir toOldDir + mv fromFile toDir + mv fromFile toFile + +Wildcard is not supported. + +*/ +func (fs *FilerServer) moveHandler(w http.ResponseWriter, r *http.Request) { + from := r.FormValue("from") + to := r.FormValue("to") + err := fs.filer.Move(from, to) + if err != nil { + glog.V(4).Infoln("moving", from, "->", to, err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) + } else { + w.WriteHeader(http.StatusOK) + } +} diff --git a/weed/server/master_server.go b/weed/server/master_server.go new file mode 100644 index 000000000..61bda6988 --- /dev/null +++ b/weed/server/master_server.go @@ -0,0 +1,131 @@ +package weed_server + +import ( + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "sync" + + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/sequence" + "github.com/chrislusf/seaweedfs/weed/topology" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/gorilla/mux" +) + +type MasterServer struct { + port int + metaFolder string + volumeSizeLimitMB uint + pulseSeconds int + defaultReplicaPlacement string + garbageThreshold string + guard *security.Guard + + Topo *topology.Topology + vg *topology.VolumeGrowth + vgLock sync.Mutex + + bounedLeaderChan chan int +} + +func NewMasterServer(r *mux.Router, port int, metaFolder string, + volumeSizeLimitMB uint, + pulseSeconds int, + confFile string, + defaultReplicaPlacement string, + garbageThreshold string, + whiteList []string, + secureKey string, +) *MasterServer { + ms := &MasterServer{ + port: port, + volumeSizeLimitMB: volumeSizeLimitMB, + pulseSeconds: pulseSeconds, + defaultReplicaPlacement: defaultReplicaPlacement, + garbageThreshold: garbageThreshold, + } + ms.bounedLeaderChan = make(chan int, 16) + seq := sequence.NewMemorySequencer() + var e error + if ms.Topo, e = topology.NewTopology("topo", confFile, seq, + uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { + glog.Fatalf("cannot create topology:%s", e) + } + ms.vg = topology.NewDefaultVolumeGrowth() + glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") + + ms.guard = security.NewGuard(whiteList, secureKey) + + r.HandleFunc("/", ms.uiStatusHandler) + r.HandleFunc("/ui/index.html", ms.uiStatusHandler) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) + r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler)) + r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + + ms.Topo.StartRefreshWritableVolumes(garbageThreshold) + + return ms +} + +func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + if ms.Topo.IsLeader() { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + } else { + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } + } +} + +func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() { + f(w, r) + } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { + ms.bounedLeaderChan <- 1 + defer func() { <-ms.bounedLeaderChan }() + targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) + return + } + glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + director := proxy.Director + proxy.Director = func(req *http.Request) { + actualHost, err := security.GetActualRemoteHost(req) + if err == nil { + req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) + } + director(req) + } + proxy.Transport = util.Transport + proxy.ServeHTTP(w, r) + } else { + //drop it to the floor + //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + } + } +} diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go new file mode 100644 index 000000000..e811631f8 --- /dev/null +++ b/weed/server/master_server_handlers.go @@ -0,0 +1,104 @@ +package weed_server + +import ( + "fmt" + "net/http" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { + volumeLocations = make(map[string]operation.LookupResult) + for _, vid := range vids { + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + if _, ok := volumeLocations[vid]; ok { + continue + } + volumeId, err := storage.NewVolumeId(vid) + if err == nil { + machines := ms.Topo.Lookup(collection, volumeId) + if machines != nil { + var ret []operation.Location + for _, dn := range machines { + ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) + } + volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret} + } else { + volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: "volumeId not found."} + } + } else { + volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: "Unknown volumeId format."} + } + } + return +} + +// Takes one volumeId only, can not do batch lookup +func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + vids := []string{vid} + collection := r.FormValue("collection") //optional, but can be faster if too many collections + volumeLocations := ms.lookupVolumeId(vids, collection) + location := volumeLocations[vid] + httpStatus := http.StatusOK + if location.Error != "" { + httpStatus = http.StatusNotFound + } + writeJsonQuiet(w, r, httpStatus, location) +} + +// This can take batched volumeIds, &volumeId=x&volumeId=y&volumeId=z +func (ms *MasterServer) volumeLookupHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + vids := r.Form["volumeId"] + collection := r.FormValue("collection") //optional, but can be faster if too many collections + volumeLocations := ms.lookupVolumeId(vids, collection) + writeJsonQuiet(w, r, http.StatusOK, volumeLocations) +} + +func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { + stats.AssignRequest() + requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64) + if e != nil || requestedCount == 0 { + requestedCount = 1 + } + + option, err := ms.getVolumeGrowOption(r) + if err != nil { + writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) + return + } + + if !ms.Topo.HasWritableVolume(option) { + if ms.Topo.FreeSpace() <= 0 { + writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"}) + return + } + ms.vgLock.Lock() + defer ms.vgLock.Unlock() + if !ms.Topo.HasWritableVolume(option) { + if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Cannot grow volume group! %v", err)) + return + } + } + } + fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) + if err == nil { + writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) + } else { + writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) + } +} diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go new file mode 100644 index 000000000..a762bf416 --- /dev/null +++ b/weed/server/master_server_handlers_admin.go @@ -0,0 +1,193 @@ +package weed_server + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/topology" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" +) + +func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { + collection, ok := ms.Topo.FindCollection(r.FormValue("collection")) + if !ok { + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection"))) + return + } + for _, server := range collection.ListVolumeServers() { + _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + } + ms.Topo.DeleteCollection(r.FormValue("collection")) +} + +func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + joinMessage := &operation.JoinMessage{} + if err = proto.Unmarshal(body, joinMessage); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + if *joinMessage.Ip == "" { + *joinMessage.Ip = r.RemoteAddr[0:strings.LastIndex(r.RemoteAddr, ":")] + } + if glog.V(4) { + if jsonData, jsonError := json.Marshal(joinMessage); jsonError != nil { + glog.V(0).Infoln("json marshaling error: ", jsonError) + writeJsonError(w, r, http.StatusBadRequest, jsonError) + return + } else { + glog.V(4).Infoln("Proto size", len(body), "json size", len(jsonData), string(jsonData)) + } + } + + ms.Topo.ProcessJoinMessage(joinMessage) + writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{ + VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, + SecretKey: string(ms.guard.SecretKey), + }) +} + +func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Topology"] = ms.Topo.ToMap() + writeJsonQuiet(w, r, http.StatusOK, m) +} + +func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { + gcThreshold := r.FormValue("garbageThreshold") + if gcThreshold == "" { + gcThreshold = ms.garbageThreshold + } + glog.Infoln("garbageThreshold =", gcThreshold) + ms.Topo.Vacuum(gcThreshold) + ms.dirStatusHandler(w, r) +} + +func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + count := 0 + option, err := ms.getVolumeGrowOption(r) + if err != nil { + writeJsonError(w, r, http.StatusNotAcceptable, err) + return + } + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) + } else { + count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) + } + } else { + err = errors.New("parameter count is not found") + } + } + if err != nil { + writeJsonError(w, r, http.StatusNotAcceptable, err) + } else { + writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count}) + } +} + +func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Volumes"] = ms.Topo.ToVolumeMap() + writeJsonQuiet(w, r, http.StatusOK, m) +} + +func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _, _, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + collection := r.FormValue("collection") + machines := ms.Topo.Lookup(collection, volumeId) + if machines != nil && len(machines) > 0 { + var url string + if r.URL.RawQuery != "" { + url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + } else { + url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + } + http.Redirect(w, r, url, http.StatusMovedPermanently) + } else { + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection)) + } +} + +func (ms *MasterServer) selfUrl(r *http.Request) string { + if r.Host != "" { + return r.Host + } + return "localhost:" + strconv.Itoa(ms.port) +} +func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() { + submitForClientHandler(w, r, ms.selfUrl(r)) + } else { + masterUrl, err := ms.Topo.Leader() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else { + submitForClientHandler(w, r, masterUrl) + } + } +} + +func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() { + deleteForClientHandler(w, r, ms.selfUrl(r)) + } else { + deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader()) + } +} + +func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + return vl.GetActiveVolumeCount(option) > 0 +} + +func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { + replicationString := r.FormValue("replication") + if replicationString == "" { + replicationString = ms.defaultReplicaPlacement + } + replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) + if err != nil { + return nil, err + } + ttl, err := storage.ReadTTL(r.FormValue("ttl")) + if err != nil { + return nil, err + } + volumeGrowOption := &topology.VolumeGrowOption{ + Collection: r.FormValue("collection"), + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), + DataNode: r.FormValue("dataNode"), + } + return volumeGrowOption, nil +} diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go new file mode 100644 index 000000000..9ad234877 --- /dev/null +++ b/weed/server/master_server_handlers_ui.go @@ -0,0 +1,30 @@ +package weed_server + +import ( + "net/http" + + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" + ui "github.com/chrislusf/seaweedfs/weed/server/master_ui" +) + +func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { + infos := make(map[string]interface{}) + infos["Version"] = util.VERSION + args := struct { + Version string + Topology interface{} + Leader string + Peers interface{} + Stats map[string]interface{} + Counters *stats.ServerStats + }{ + util.VERSION, + ms.Topo.ToMap(), + ms.Topo.RaftServer.Leader(), + ms.Topo.RaftServer.Peers(), + infos, + serverStats, + } + ui.StatusTpl.Execute(w, args) +} diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go new file mode 100644 index 000000000..e9ee2d8d2 --- /dev/null +++ b/weed/server/master_ui/templates.go @@ -0,0 +1,102 @@ +package master_ui + +import ( + "html/template" +) + +var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> +<html> + <head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="icon" href="http://7viirv.com1.z0.glb.clouddn.com/seaweed_favicon.png" sizes="32x32" /> + <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css"> + </head> + <body> + <div class="container"> + <div class="page-header"> + <h1> + <img src="http://7viirv.com1.z0.glb.clouddn.com/seaweed50x50.png"></img> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Cluster status</h2> + <table class="table"> + <tbody> + <tr> + <th>Free</th> + <td>{{ .Topology.Free }}</td> + </tr> + <tr> + <th>Max</th> + <td>{{ .Topology.Max }}</td> + </tr> + <tr> + <th>Leader</th> + <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> + </tr> + <tr> + <td class="col-sm-2 field-label"><label>Peers:</label></td> + <td class="col-sm-10"><ul class="list-unstyled"> + {{ range $k, $p := .Peers }} + <li><a href="{{ $p.ConnectionString }}">{{ $p.Name }}</a></li> + {{ end }} + </ul></td> + </tr> + </tbody> + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Concurrent Connections</th> + <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + </div> + </div> + + <div class="row"> + <h2>Topology</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Data Center</th> + <th>Rack</th> + <th>RemoteAddr</th> + <th>#Volumes</th> + <th>Max</th> + </tr> + </thead> + <tbody> + {{ range $dc_index, $dc := .Topology.DataCenters }} + {{ range $rack_index, $rack := $dc.Racks }} + {{ range $dn_index, $dn := $rack.DataNodes }} + <tr> + <td><code>{{ $dc.Id }}</code></td> + <td>{{ $rack.Id }}</td> + <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td> + <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.Max }}</td> + </tr> + {{ end }} + {{ end }} + {{ end }} + </tbody> + </table> + </div> + + </div> + </body> +</html> +`)) diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go new file mode 100644 index 000000000..a35659818 --- /dev/null +++ b/weed/server/raft_server.go @@ -0,0 +1,217 @@ +package weed_server + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "os" + "path" + "reflect" + "sort" + "strings" + "time" + + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/topology" + "github.com/gorilla/mux" +) + +type RaftServer struct { + peers []string // initial peers to join with + raftServer raft.Server + dataDir string + httpAddr string + router *mux.Router + topo *topology.Topology +} + +func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { + s := &RaftServer{ + peers: peers, + httpAddr: httpAddr, + dataDir: dataDir, + router: r, + topo: topo, + } + + if glog.V(4) { + raft.SetLogLevel(2) + } + + raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) + + var err error + transporter := raft.NewHTTPTransporter("/cluster", 0) + transporter.Transport.MaxIdleConnsPerHost = 1024 + glog.V(1).Infof("Starting RaftServer with IP:%v:", httpAddr) + + // Clear old cluster configurations if peers are changed + if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { + glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) + os.RemoveAll(path.Join(s.dataDir, "conf")) + os.RemoveAll(path.Join(s.dataDir, "log")) + os.RemoveAll(path.Join(s.dataDir, "snapshot")) + } + + s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "") + if err != nil { + glog.V(0).Infoln(err) + return nil + } + transporter.Install(s.raftServer, s) + s.raftServer.SetHeartbeatInterval(1 * time.Second) + s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 3450 * time.Millisecond) + s.raftServer.Start() + + s.router.HandleFunc("/cluster/join", s.joinHandler).Methods("POST") + s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") + + if len(s.peers) > 0 { + // Join to leader if specified. + for { + glog.V(0).Infoln("Joining cluster:", strings.Join(s.peers, ",")) + time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) + firstJoinError := s.Join(s.peers) + if firstJoinError != nil { + glog.V(0).Infoln("No existing server found. Starting as leader in the new cluster.") + _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + }) + if err != nil { + glog.V(0).Infoln(err) + } else { + break + } + } else { + break + } + } + } else if s.raftServer.IsLogEmpty() { + // Initialize the server by joining itself. + glog.V(0).Infoln("Initializing new cluster") + + _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + }) + + if err != nil { + glog.V(0).Infoln(err) + return nil + } + + } else { + glog.V(0).Infoln("Old conf,log,snapshot should have been removed.") + } + + return s +} + +func (s *RaftServer) Peers() (members []string) { + peers := s.raftServer.Peers() + + for _, p := range peers { + members = append(members, strings.TrimPrefix(p.ConnectionString, "http://")) + } + + return +} + +func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { + confPath := path.Join(dir, "conf") + // open conf file + b, err := ioutil.ReadFile(confPath) + if err != nil { + return oldPeers, true + } + conf := &raft.Config{} + if err = json.Unmarshal(b, conf); err != nil { + return oldPeers, true + } + + for _, p := range conf.Peers { + oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://")) + } + oldPeers = append(oldPeers, self) + + sort.Strings(peers) + sort.Strings(oldPeers) + + return oldPeers, reflect.DeepEqual(peers, oldPeers) + +} + +// Join joins an existing cluster. +func (s *RaftServer) Join(peers []string) error { + command := &raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: "http://" + s.httpAddr, + } + + var err error + var b bytes.Buffer + json.NewEncoder(&b).Encode(command) + for _, m := range peers { + if m == s.httpAddr { + continue + } + target := fmt.Sprintf("http://%s/cluster/join", strings.TrimSpace(m)) + glog.V(0).Infoln("Attempting to connect to:", target) + + err = postFollowingOneRedirect(target, "application/json", &b) + + if err != nil { + glog.V(0).Infoln("Post returned error: ", err.Error()) + if _, ok := err.(*url.Error); ok { + // If we receive a network error try the next member + continue + } + } else { + return nil + } + } + + return errors.New("Could not connect to any cluster peers") +} + +// a workaround because http POST following redirection misses request body +func postFollowingOneRedirect(target string, contentType string, b *bytes.Buffer) error { + backupReader := bytes.NewReader(b.Bytes()) + resp, err := http.Post(target, contentType, b) + if err != nil { + return err + } + defer resp.Body.Close() + reply, _ := ioutil.ReadAll(resp.Body) + statusCode := resp.StatusCode + + if statusCode == http.StatusMovedPermanently { + var urlStr string + if urlStr = resp.Header.Get("Location"); urlStr == "" { + return fmt.Errorf("%d response missing Location header", resp.StatusCode) + } + + glog.V(0).Infoln("Post redirected to ", urlStr) + resp2, err2 := http.Post(urlStr, contentType, backupReader) + if err2 != nil { + return err2 + } + defer resp2.Body.Close() + reply, _ = ioutil.ReadAll(resp2.Body) + statusCode = resp2.StatusCode + } + + glog.V(0).Infoln("Post returned status: ", statusCode, string(reply)) + if statusCode != http.StatusOK { + return errors.New(string(reply)) + } + + return nil +} diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go new file mode 100644 index 000000000..335ba668f --- /dev/null +++ b/weed/server/raft_server_handlers.go @@ -0,0 +1,64 @@ +package weed_server + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "strings" + + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" +) + +// Handles incoming RAFT joins. +func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { + glog.V(0).Infoln("Processing incoming join. Current Leader", s.raftServer.Leader(), "Self", s.raftServer.Name(), "Peers", s.raftServer.Peers()) + command := &raft.DefaultJoinCommand{} + + commandText, _ := ioutil.ReadAll(req.Body) + glog.V(0).Info("Command:", string(commandText)) + if err := json.NewDecoder(strings.NewReader(string(commandText))).Decode(&command); err != nil { + glog.V(0).Infoln("Error decoding json message:", err, string(commandText)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + glog.V(0).Infoln("join command from Name", command.Name, "Connection", command.ConnectionString) + + if _, err := s.raftServer.Do(command); err != nil { + switch err { + case raft.NotLeaderError: + s.redirectToLeader(w, req) + default: + glog.V(0).Infoln("Error processing join:", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } +} + +func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { + s.router.HandleFunc(pattern, handler) +} + +func (s *RaftServer) redirectToLeader(w http.ResponseWriter, req *http.Request) { + if leader, e := s.topo.Leader(); e == nil { + //http.StatusMovedPermanently does not cause http POST following redirection + glog.V(0).Infoln("Redirecting to", http.StatusMovedPermanently, "http://"+leader+req.URL.Path) + http.Redirect(w, req, "http://"+leader+req.URL.Path, http.StatusMovedPermanently) + } else { + glog.V(0).Infoln("Error: Leader Unknown") + http.Error(w, "Leader unknown", http.StatusInternalServerError) + } +} + +func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { + ret := operation.ClusterStatusResult{ + IsLeader: s.topo.IsLeader(), + Peers: s.Peers(), + } + if leader, e := s.topo.Leader(); e == nil { + ret.Leader = leader + } + writeJsonQuiet(w, r, http.StatusOK, ret) +} diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go new file mode 100644 index 000000000..79a4276b1 --- /dev/null +++ b/weed/server/volume_server.go @@ -0,0 +1,125 @@ +package weed_server + +import ( + "math/rand" + "net/http" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type VolumeServer struct { + masterNode string + mnLock sync.RWMutex + pulseSeconds int + dataCenter string + rack string + store *storage.Store + guard *security.Guard + + needleMapKind storage.NeedleMapType + FixJpgOrientation bool + ReadRedirect bool +} + +func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, + port int, publicUrl string, + folders []string, maxCounts []int, + needleMapKind storage.NeedleMapType, + masterNode string, pulseSeconds int, + dataCenter string, rack string, + whiteList []string, + fixJpgOrientation bool, + readRedirect bool) *VolumeServer { + vs := &VolumeServer{ + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + needleMapKind: needleMapKind, + FixJpgOrientation: fixJpgOrientation, + ReadRedirect: readRedirect, + } + vs.SetMasterNode(masterNode) + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) + + vs.guard = security.NewGuard(whiteList, "") + + adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) + adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) + adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler)) + adminMux.HandleFunc("/admin/vacuum/check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler)) + adminMux.HandleFunc("/admin/vacuum/compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler)) + adminMux.HandleFunc("/admin/vacuum/commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler)) + adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler)) + adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) + adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) + adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + adminMux.HandleFunc("/delete", vs.guard.WhiteList(vs.batchDeleteHandler)) + adminMux.HandleFunc("/", vs.privateStoreHandler) + if publicMux != adminMux { + // separated admin and public port + publicMux.HandleFunc("/favicon.ico", vs.faviconHandler) + publicMux.HandleFunc("/", vs.publicReadOnlyHandler) + } + + go func() { + connected := true + + glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) + vs.store.SetBootstrapMaster(vs.GetMasterNode()) + vs.store.SetDataCenter(vs.dataCenter) + vs.store.SetRack(vs.rack) + for { + glog.V(4).Infof("Volume server sending to master %s", vs.GetMasterNode()) + master, secretKey, err := vs.store.SendHeartbeatToMaster() + if err == nil { + if !connected { + connected = true + vs.SetMasterNode(master) + vs.guard.SecretKey = secretKey + glog.V(0).Infoln("Volume Server Connected with master at", master) + } + } else { + glog.V(1).Infof("Volume Server Failed to talk with master %s: %v", vs.masterNode, err) + if connected { + connected = false + } + } + if connected { + time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) + } else { + time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond) + } + } + }() + + return vs +} + +func (vs *VolumeServer) GetMasterNode() string { + vs.mnLock.RLock() + defer vs.mnLock.RUnlock() + return vs.masterNode +} + +func (vs *VolumeServer) SetMasterNode(masterNode string) { + vs.mnLock.Lock() + defer vs.mnLock.Unlock() + vs.masterNode = masterNode +} + +func (vs *VolumeServer) Shutdown() { + glog.V(0).Infoln("Shutting down volume server...") + vs.store.Close() + glog.V(0).Infoln("Shut down successfully!") +} + +func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { + return security.GenJwt(vs.guard.SecretKey, fileId) +} diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go new file mode 100644 index 000000000..2d6fe7849 --- /dev/null +++ b/weed/server/volume_server_handlers.go @@ -0,0 +1,57 @@ +package weed_server + +import ( + "net/http" + + "github.com/chrislusf/seaweedfs/weed/stats" +) + +/* + +If volume server is started with a separated public port, the public port will +be more "secure". + +Public port currently only supports reads. + +Later writes on public port can have one of the 3 +security settings: +1. not secured +2. secured by white list +3. secured by JWT(Json Web Token) + +*/ + +func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + stats.ReadRequest() + vs.GetOrHeadHandler(w, r) + case "HEAD": + stats.ReadRequest() + vs.GetOrHeadHandler(w, r) + case "DELETE": + stats.DeleteRequest() + vs.guard.WhiteList(vs.DeleteHandler)(w, r) + case "PUT": + stats.WriteRequest() + vs.guard.WhiteList(vs.PostHandler)(w, r) + case "POST": + stats.WriteRequest() + vs.guard.WhiteList(vs.PostHandler)(w, r) + } +} + +func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + stats.ReadRequest() + vs.GetOrHeadHandler(w, r) + case "HEAD": + stats.ReadRequest() + vs.GetOrHeadHandler(w, r) + } +} + +func (vs *VolumeServer) faviconHandler(w http.ResponseWriter, r *http.Request) { + vs.FaviconHandler(w, r) +} diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go new file mode 100644 index 000000000..ae9817ef6 --- /dev/null +++ b/weed/server/volume_server_handlers_admin.go @@ -0,0 +1,50 @@ +package weed_server + +import ( + "net/http" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Volumes"] = vs.store.Status() + writeJsonQuiet(w, r, http.StatusOK, m) +} + +func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), vs.needleMapKind, r.FormValue("replication"), r.FormValue("ttl")) + if err == nil { + writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) + } else { + writeJsonError(w, r, http.StatusNotAcceptable, err) + } + glog.V(2).Infoln("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replication =", r.FormValue("replication"), ", error =", err) +} + +func (vs *VolumeServer) deleteCollectionHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.DeleteCollection(r.FormValue("collection")) + if err == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) + } else { + writeJsonError(w, r, http.StatusInternalServerError, err) + } + glog.V(2).Infoln("deleting collection =", r.FormValue("collection"), ", error =", err) +} + +func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + var ds []*stats.DiskStatus + for _, loc := range vs.store.Locations { + if dir, e := filepath.Abs(loc.Directory); e == nil { + ds = append(ds, stats.NewDiskStatus(dir)) + } + } + m["DiskStatuses"] = ds + writeJsonQuiet(w, r, http.StatusOK, m) +} diff --git a/weed/server/volume_server_handlers_helper.go b/weed/server/volume_server_handlers_helper.go new file mode 100644 index 000000000..2bab35e45 --- /dev/null +++ b/weed/server/volume_server_handlers_helper.go @@ -0,0 +1,115 @@ +package weed_server + +import ( + "errors" + "fmt" + "mime/multipart" + "net/textproto" + "strconv" + "strings" +) + +// copied from src/pkg/net/http/fs.go + +// httpRange specifies the byte range to be sent to the client. +type httpRange struct { + start, length int64 +} + +func (r httpRange) contentRange(size int64) string { + return fmt.Sprintf("bytes %d-%d/%d", r.start, r.start+r.length-1, size) +} + +func (r httpRange) mimeHeader(contentType string, size int64) textproto.MIMEHeader { + return textproto.MIMEHeader{ + "Content-Range": {r.contentRange(size)}, + "Content-Type": {contentType}, + } +} + +// parseRange parses a Range header string as per RFC 2616. +func parseRange(s string, size int64) ([]httpRange, error) { + if s == "" { + return nil, nil // header not present + } + const b = "bytes=" + if !strings.HasPrefix(s, b) { + return nil, errors.New("invalid range") + } + var ranges []httpRange + for _, ra := range strings.Split(s[len(b):], ",") { + ra = strings.TrimSpace(ra) + if ra == "" { + continue + } + i := strings.Index(ra, "-") + if i < 0 { + return nil, errors.New("invalid range") + } + start, end := strings.TrimSpace(ra[:i]), strings.TrimSpace(ra[i+1:]) + var r httpRange + if start == "" { + // If no start is specified, end specifies the + // range start relative to the end of the file. + i, err := strconv.ParseInt(end, 10, 64) + if err != nil { + return nil, errors.New("invalid range") + } + if i > size { + i = size + } + r.start = size - i + r.length = size - r.start + } else { + i, err := strconv.ParseInt(start, 10, 64) + if err != nil || i > size || i < 0 { + return nil, errors.New("invalid range") + } + r.start = i + if end == "" { + // If no end is specified, range extends to end of the file. + r.length = size - r.start + } else { + i, err := strconv.ParseInt(end, 10, 64) + if err != nil || r.start > i { + return nil, errors.New("invalid range") + } + if i >= size { + i = size - 1 + } + r.length = i - r.start + 1 + } + } + ranges = append(ranges, r) + } + return ranges, nil +} + +// countingWriter counts how many bytes have been written to it. +type countingWriter int64 + +func (w *countingWriter) Write(p []byte) (n int, err error) { + *w += countingWriter(len(p)) + return len(p), nil +} + +// rangesMIMESize returns the number of bytes it takes to encode the +// provided ranges as a multipart response. +func rangesMIMESize(ranges []httpRange, contentType string, contentSize int64) (encSize int64) { + var w countingWriter + mw := multipart.NewWriter(&w) + for _, ra := range ranges { + mw.CreatePart(ra.mimeHeader(contentType, contentSize)) + encSize += ra.length + } + mw.Close() + encSize += int64(w) + return +} + +func sumRangesSize(ranges []httpRange) (size int64) { + for _, ra := range ranges { + size += ra.length + } + return +} diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go new file mode 100644 index 000000000..3889afe5c --- /dev/null +++ b/weed/server/volume_server_handlers_read.go @@ -0,0 +1,301 @@ +package weed_server + +import ( + "bytes" + "io" + "mime" + "mime/multipart" + "net/http" + "path" + "strconv" + "strings" + "time" + + "net/url" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/images" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") + +func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + glog.V(2).Infoln("parsing error:", err, r.URL.Path) + w.WriteHeader(http.StatusBadRequest) + return + } + err = n.ParsePath(fid) + if err != nil { + glog.V(2).Infoln("parsing fid error:", err, r.URL.Path) + w.WriteHeader(http.StatusBadRequest) + return + } + + glog.V(4).Infoln("volume", volumeId, "reading", n) + if !vs.store.HasVolume(volumeId) { + if !vs.ReadRedirect { + glog.V(2).Infoln("volume is not local:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String()) + glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) + if err == nil && len(lookupResult.Locations) > 0 { + u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) + u.Path = r.URL.Path + arg := url.Values{} + if c := r.FormValue("collection"); c != "" { + arg.Set("collection", c) + } + u.RawQuery = arg.Encode() + http.Redirect(w, r, u.String(), http.StatusMovedPermanently) + + } else { + glog.V(2).Infoln("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + return + } + cookie := n.Cookie + count, e := vs.store.ReadVolumeNeedle(volumeId, n) + glog.V(4).Infoln("read bytes", count, "error", e) + if e != nil || count <= 0 { + glog.V(0).Infoln("read error:", e, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + defer n.ReleaseMemory() + if n.Cookie != cookie { + glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) + w.WriteHeader(http.StatusNotFound) + return + } + if n.LastModified != 0 { + w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) + if r.Header.Get("If-Modified-Since") != "" { + if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { + if t.Unix() >= int64(n.LastModified) { + w.WriteHeader(http.StatusNotModified) + return + } + } + } + } + etag := n.Etag() + if inm := r.Header.Get("If-None-Match"); inm == etag { + w.WriteHeader(http.StatusNotModified) + return + } + w.Header().Set("Etag", etag) + + if vs.tryHandleChunkedFile(n, filename, w, r) { + return + } + + if n.NameSize > 0 && filename == "" { + filename = string(n.Name) + if ext == "" { + ext = path.Ext(filename) + } + } + mtype := "" + if n.MimeSize > 0 { + mt := string(n.Mime) + if !strings.HasPrefix(mt, "application/octet-stream") { + mtype = mt + } + } + + if ext != ".gz" { + if n.IsGzipped() { + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Set("Content-Encoding", "gzip") + } else { + if n.Data, err = operation.UnGzipData(n.Data); err != nil { + glog.V(0).Infoln("ungzip error:", err, r.URL.Path) + } + } + } + } + if ext == ".png" || ext == ".jpg" || ext == ".gif" { + width, height := 0, 0 + if r.FormValue("width") != "" { + width, _ = strconv.Atoi(r.FormValue("width")) + } + if r.FormValue("height") != "" { + height, _ = strconv.Atoi(r.FormValue("height")) + } + n.Data, _, _ = images.Resized(ext, n.Data, width, height) + } + + if e := writeResponseContent(filename, mtype, bytes.NewReader(n.Data), w, r); e != nil { + glog.V(2).Infoln("response write error:", e) + } +} + +func (vs *VolumeServer) FaviconHandler(w http.ResponseWriter, r *http.Request) { + data, err := images.Asset("favicon/favicon.ico") + if err != nil { + glog.V(2).Infoln("favicon read error:", err) + return + } + + if e := writeResponseContent("favicon.ico", "image/x-icon", bytes.NewReader(data), w, r); e != nil { + glog.V(2).Infoln("response write error:", e) + } +} + +func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { + if !n.IsChunkedManifest() { + return false + } + + chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) + if e != nil { + glog.V(0).Infof("load chunked manifest (%s) error: %v", r.URL.Path, e) + return false + } + if fileName == "" && chunkManifest.Name != "" { + fileName = chunkManifest.Name + } + mType := "" + if chunkManifest.Mime != "" { + mt := chunkManifest.Mime + if !strings.HasPrefix(mt, "application/octet-stream") { + mType = mt + } + } + + w.Header().Set("X-File-Store", "chunked") + + chunkedFileReader := &operation.ChunkedFileReader{ + Manifest: chunkManifest, + Master: vs.GetMasterNode(), + } + defer chunkedFileReader.Close() + if e := writeResponseContent(fileName, mType, chunkedFileReader, w, r); e != nil { + glog.V(2).Infoln("response write error:", e) + } + return true +} + +func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error { + totalSize, e := rs.Seek(0, 2) + if mimeType == "" { + if ext := path.Ext(filename); ext != "" { + mimeType = mime.TypeByExtension(ext) + } + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } + if filename != "" { + contentDisposition := "inline" + if r.FormValue("dl") != "" { + if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl { + contentDisposition = "attachment" + } + } + w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`) + } + w.Header().Set("Accept-Ranges", "bytes") + if r.Method == "HEAD" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + return nil + } + rangeReq := r.Header.Get("Range") + if rangeReq == "" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + if _, e = rs.Seek(0, 0); e != nil { + return e + } + _, e = io.Copy(w, rs) + return e + } + + //the rest is dealing with partial content request + //mostly copy from src/pkg/net/http/fs.go + ranges, err := parseRange(rangeReq, totalSize) + if err != nil { + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return nil + } + if sumRangesSize(ranges) > totalSize { + // The total number of bytes in all the ranges + // is larger than the size of the file by + // itself, so this is probably an attack, or a + // dumb client. Ignore the range request. + return nil + } + if len(ranges) == 0 { + return nil + } + if len(ranges) == 1 { + // RFC 2616, Section 14.16: + // "When an HTTP message includes the content of a single + // range (for example, a response to a request for a + // single range, or to a request for a set of ranges + // that overlap without any holes), this content is + // transmitted with a Content-Range header, and a + // Content-Length header showing the number of bytes + // actually transferred. + // ... + // A response to a request for a single range MUST NOT + // be sent using the multipart/byteranges media type." + ra := ranges[0] + w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) + if _, e = rs.Seek(ra.start, 0); e != nil { + return e + } + + _, e = io.CopyN(w, rs, ra.length) + return e + } + // process multiple ranges + for _, ra := range ranges { + if ra.start > totalSize { + http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) + return nil + } + } + sendSize := rangesMIMESize(ranges, mimeType, totalSize) + pr, pw := io.Pipe() + mw := multipart.NewWriter(pw) + w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) + sendContent := pr + defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. + go func() { + for _, ra := range ranges { + part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) + if e != nil { + pw.CloseWithError(e) + return + } + if _, e = rs.Seek(ra.start, 0); e != nil { + pw.CloseWithError(e) + return + } + if _, e = io.CopyN(part, rs, ra.length); e != nil { + pw.CloseWithError(e) + return + } + } + mw.Close() + pw.Close() + }() + if w.Header().Get("Content-Encoding") == "" { + w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) + } + w.WriteHeader(http.StatusPartialContent) + _, e = io.CopyN(w, sendContent, sendSize) + return e +} diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go new file mode 100644 index 000000000..8a2e30743 --- /dev/null +++ b/weed/server/volume_server_handlers_sync.go @@ -0,0 +1,87 @@ +package weed_server + +import ( + "fmt" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + syncStat := v.GetVolumeSyncStatus() + if syncStat.Error != "" { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Get Volume %d status error: %s", v.Id, syncStat.Error)) + glog.V(2).Infoln("getVolumeSyncStatusHandler volume =", r.FormValue("volume"), ", error =", err) + } else { + writeJsonQuiet(w, r, http.StatusOK, syncStat) + } +} + +func (vs *VolumeServer) getVolumeIndexContentHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + content, err := v.IndexFileContent() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + w.Write(content) +} + +func (vs *VolumeServer) getVolumeDataContentHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("Not Found volume: %v", err)) + return + } + if int(v.SuperBlock.CompactRevision) != util.ParseInt(r.FormValue("revision"), 0) { + writeJsonError(w, r, http.StatusExpectationFailed, fmt.Errorf("Requested Volume Revision is %s, but current revision is %d", r.FormValue("revision"), v.SuperBlock.CompactRevision)) + return + } + offset := uint32(util.ParseUint64(r.FormValue("offset"), 0)) + size := uint32(util.ParseUint64(r.FormValue("size"), 0)) + content, block, err := storage.ReadNeedleBlob(v.DataFile(), int64(offset)*storage.NeedlePaddingSize, size) + defer storage.ReleaseBytes(block.Bytes) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + id := util.ParseUint64(r.FormValue("id"), 0) + n := new(storage.Needle) + n.ParseNeedleHeader(content) + if id != n.Id { + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)) + return + } + + w.Write(content) +} + +func (vs *VolumeServer) getVolume(volumeParameterName string, r *http.Request) (*storage.Volume, error) { + volumeIdString := r.FormValue(volumeParameterName) + if volumeIdString == "" { + err := fmt.Errorf("Empty Volume Id: Need to pass in %s=the_volume_id.", volumeParameterName) + return nil, err + } + vid, err := storage.NewVolumeId(volumeIdString) + if err != nil { + err = fmt.Errorf("Volume Id %s is not a valid unsigned integer", volumeIdString) + return nil, err + } + v := vs.store.GetVolume(vid) + if v == nil { + return nil, fmt.Errorf("Not Found Volume Id %s: %d", volumeIdString, vid) + } + return v, nil +} diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go new file mode 100644 index 000000000..7923c95c0 --- /dev/null +++ b/weed/server/volume_server_handlers_ui.go @@ -0,0 +1,38 @@ +package weed_server + +import ( + "net/http" + "path/filepath" + "time" + + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" + ui "github.com/chrislusf/seaweedfs/weed/server/volume_server_ui" +) + +func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { + infos := make(map[string]interface{}) + infos["Up Time"] = time.Now().Sub(startTime).String() + var ds []*stats.DiskStatus + for _, loc := range vs.store.Locations { + if dir, e := filepath.Abs(loc.Directory); e == nil { + ds = append(ds, stats.NewDiskStatus(dir)) + } + } + args := struct { + Version string + Master string + Volumes interface{} + DiskStatuses interface{} + Stats interface{} + Counters *stats.ServerStats + }{ + util.VERSION, + vs.masterNode, + vs.store.Status(), + ds, + infos, + serverStats, + } + ui.StatusTpl.Execute(w, args) +} diff --git a/weed/server/volume_server_handlers_vacuum.go b/weed/server/volume_server_handlers_vacuum.go new file mode 100644 index 000000000..ef348d35c --- /dev/null +++ b/weed/server/volume_server_handlers_vacuum.go @@ -0,0 +1,35 @@ +package weed_server + +import ( + "net/http" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { + err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) + if err == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"error": "", "result": ret}) + } else { + writeJsonQuiet(w, r, http.StatusInternalServerError, map[string]interface{}{"error": err.Error(), "result": false}) + } + glog.V(2).Infoln("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) +} +func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.CompactVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) + } else { + writeJsonError(w, r, http.StatusInternalServerError, err) + } + glog.V(2).Infoln("compacted volume =", r.FormValue("volume"), ", error =", err) +} +func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.CommitCompactVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, http.StatusOK, map[string]string{"error": ""}) + } else { + writeJsonError(w, r, http.StatusInternalServerError, err) + } + glog.V(2).Infoln("commit compact volume =", r.FormValue("volume"), ", error =", err) +} diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go new file mode 100644 index 000000000..e7ca2f8e1 --- /dev/null +++ b/weed/server/volume_server_handlers_write.go @@ -0,0 +1,165 @@ +package weed_server + +import ( + "errors" + "fmt" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/topology" +) + +func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { + if e := r.ParseForm(); e != nil { + glog.V(0).Infoln("form parse error:", e) + writeJsonError(w, r, http.StatusBadRequest, e) + return + } + vid, _, _, _, _ := parseURLPath(r.URL.Path) + volumeId, ve := storage.NewVolumeId(vid) + if ve != nil { + glog.V(0).Infoln("NewVolumeId error:", ve) + writeJsonError(w, r, http.StatusBadRequest, ve) + return + } + needle, ne := storage.NewNeedle(r, vs.FixJpgOrientation) + if ne != nil { + writeJsonError(w, r, http.StatusBadRequest, ne) + return + } + + ret := operation.UploadResult{} + size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), + vs.store, volumeId, needle, r) + httpStatus := http.StatusCreated + if errorStatus != "" { + httpStatus = http.StatusInternalServerError + ret.Error = errorStatus + } + if needle.HasName() { + ret.Name = string(needle.Name) + } + ret.Size = size + writeJsonQuiet(w, r, httpStatus, ret) +} + +func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, _, _, _ := parseURLPath(r.URL.Path) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(fid) + + glog.V(2).Infoln("deleting", n) + + cookie := n.Cookie + + _, ok := vs.store.ReadVolumeNeedle(volumeId, n) + if ok != nil { + m := make(map[string]uint32) + m["size"] = 0 + writeJsonQuiet(w, r, http.StatusNotFound, m) + return + } + defer n.ReleaseMemory() + + if n.Cookie != cookie { + glog.V(0).Infoln("delete", r.URL.Path, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + writeJsonError(w, r, http.StatusBadRequest, errors.New("File Random Cookie does not match.")) + return + } + + count := int64(n.Size) + + if n.IsChunkedManifest() { + chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) + if e != nil { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e)) + return + } + // make sure all chunks had deleted before delete manifest + if e := chunkManifest.DeleteChunks(vs.GetMasterNode()); e != nil { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) + return + } + count = chunkManifest.Size + } + + _, err := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r) + + if err == nil { + m := make(map[string]int64) + m["size"] = count + writeJsonQuiet(w, r, http.StatusAccepted, m) + } else { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err)) + } + +} + +//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas. +func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + var ret []operation.DeleteResult + for _, fid := range r.Form["fid"] { + vid, id_cookie, err := operation.ParseFileId(fid) + if err != nil { + ret = append(ret, operation.DeleteResult{ + Fid: fid, + Status: http.StatusBadRequest, + Error: err.Error()}) + continue + } + n := new(storage.Needle) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(id_cookie) + glog.V(4).Infoln("batch deleting", n) + cookie := n.Cookie + if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + ret = append(ret, operation.DeleteResult{ + Fid: fid, + Status: http.StatusNotFound, + Error: err.Error(), + }) + continue + } + + if n.IsChunkedManifest() { + ret = append(ret, operation.DeleteResult{ + Fid: fid, + Status: http.StatusNotAcceptable, + Error: "ChunkManifest: not allowed in batch delete mode.", + }) + n.ReleaseMemory() + continue + } + + if n.Cookie != cookie { + ret = append(ret, operation.DeleteResult{ + Fid: fid, + Status: http.StatusBadRequest, + Error: "File Random Cookie does not match.", + }) + glog.V(0).Infoln("deleting", fid, "with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + n.ReleaseMemory() + return + } + if size, err := vs.store.Delete(volumeId, n); err != nil { + ret = append(ret, operation.DeleteResult{ + Fid: fid, + Status: http.StatusInternalServerError, + Error: err.Error()}, + ) + } else { + ret = append(ret, operation.DeleteResult{ + Fid: fid, + Status: http.StatusAccepted, + Size: int(size)}, + ) + } + n.ReleaseMemory() + } + + writeJsonQuiet(w, r, http.StatusAccepted, ret) +} diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go new file mode 100644 index 000000000..c3db6e92a --- /dev/null +++ b/weed/server/volume_server_ui/templates.go @@ -0,0 +1,135 @@ +package master_ui + +import ( + "html/template" + "strconv" + "strings" +) + +func join(data []int64) string { + var ret []string + for _, d := range data { + ret = append(ret, strconv.Itoa(int(d))) + } + return strings.Join(ret, ",") +} + +var funcMap = template.FuncMap{ + "join": join, +} + +var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> +<html> + <head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="icon" href="http://7viirv.com1.z0.glb.clouddn.com/seaweed_favicon.png" sizes="32x32" /> + <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.1/css/bootstrap.min.css"> + <script type="text/javascript" src="https://code.jquery.com/jquery-2.1.3.min.js"></script> + <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script> + <script type="text/javascript"> + $(function() { + var periods = ['second', 'minute', 'hour', 'day']; + for (i = 0; i < periods.length; i++) { + var period = periods[i]; + $('.inlinesparkline-'+period).sparkline('html', { + type: 'line', + barColor: 'red', + tooltipSuffix:' request per '+period, + }); + } + }); + </script> + <style> + #jqstooltip{ + height: 28px !important; + width: 150px !important; + } + </style> + </head> + <body> + <div class="container"> + <div class="page-header"> + <h1> + <img src="http://7viirv.com1.z0.glb.clouddn.com/seaweed50x50.png"></img> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Disk Stats</h2> + <table class="table table-condensed table-striped"> + {{ range .DiskStatuses }} + <tr> + <th>{{ .Dir }}</th> + <td>{{ .Free }} Bytes Free</td> + </tr> + {{ end }} + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Master</th> + <td><a href="http://{{.Master}}/ui/index.html">{{.Master}}</a></td> + </tr> + <tr> + <th>Weekly # ReadRequests</th> + <td><span class="inlinesparkline-day">{{ .Counters.ReadRequests.WeekCounter.ToList | join }}</span></td> + </tr> + <tr> + <th>Daily # ReadRequests</th> + <td><span class="inlinesparkline-hour">{{ .Counters.ReadRequests.DayCounter.ToList | join }}</span></td> + </tr> + <tr> + <th>Hourly # ReadRequests</th> + <td><span class="inlinesparkline-minute">{{ .Counters.ReadRequests.HourCounter.ToList | join }}</span></td> + </tr> + <tr> + <th>Last Minute # ReadRequests</th> + <td><span class="inlinesparkline-second">{{ .Counters.ReadRequests.MinuteCounter.ToList | join }}</span></td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + </div> + </div> + + <div class="row"> + <h2>Volumes</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Id</th> + <th>Collection</th> + <th>Size</th> + <th>Files</th> + <th>Trash</th> + <th>TTL</th> + </tr> + </thead> + <tbody> + {{ range .Volumes }} + <tr> + <td><code>{{ .Id }}</code></td> + <td>{{ .Collection }}</td> + <td>{{ .Size }} Bytes</td> + <td>{{ .FileCount }}</td> + <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td> + <td>{{ .Ttl }}</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + </div> + </body> +</html> +`)) |
