aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/operation/data_struts.go1
-rw-r--r--go/operation/delete_content.go5
-rw-r--r--go/operation/submit.go32
-rw-r--r--go/operation/upload_content.go10
-rw-r--r--go/security/guard.go111
-rw-r--r--go/security/jwt.go72
-rw-r--r--go/storage/store.go14
-rw-r--r--go/topology/store_replicate.go26
-rw-r--r--go/util/constants.go2
-rw-r--r--go/util/http_util.go7
-rw-r--r--go/weed/benchmark.go10
-rw-r--r--go/weed/filer.go4
-rw-r--r--go/weed/master.go11
-rw-r--r--go/weed/mount_std.go9
-rw-r--r--go/weed/server.go17
-rw-r--r--go/weed/upload.go22
-rw-r--r--go/weed/weed_server/common.go4
-rw-r--r--go/weed/weed_server/filer_server.go7
-rw-r--r--go/weed/weed_server/filer_server_handlers.go6
-rw-r--r--go/weed/weed_server/master_server.go29
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go5
-rw-r--r--go/weed/weed_server/volume_server.go48
-rw-r--r--go/weed/weed_server/volume_server_handlers.go7
23 files changed, 303 insertions, 156 deletions
diff --git a/go/operation/data_struts.go b/go/operation/data_struts.go
index 4980f9913..bfc53aa50 100644
--- a/go/operation/data_struts.go
+++ b/go/operation/data_struts.go
@@ -2,5 +2,6 @@ package operation
type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
+ SecretKey string `json:"secretKey,omitempty"`
Error string `json:"error,omitempty"`
}
diff --git a/go/operation/delete_content.go b/go/operation/delete_content.go
index 064cc8df9..afd1bbc34 100644
--- a/go/operation/delete_content.go
+++ b/go/operation/delete_content.go
@@ -7,6 +7,7 @@ import (
"strings"
"sync"
+ "github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/util"
)
@@ -16,12 +17,12 @@ type DeleteResult struct {
Error string `json:"error,omitempty"`
}
-func DeleteFile(master string, fileId string) error {
+func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
fileUrl, err := LookupFileId(master, fileId)
if err != nil {
return err
}
- return util.Delete(fileUrl)
+ return util.Delete(fileUrl, jwt)
}
func ParseFileId(fid string) (vid string, key_cookie string, err error) {
diff --git a/go/operation/submit.go b/go/operation/submit.go
index 3ab6d78d9..03551b1e8 100644
--- a/go/operation/submit.go
+++ b/go/operation/submit.go
@@ -10,6 +10,7 @@ import (
"strings"
"github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/security"
)
type FilePart struct {
@@ -34,7 +35,10 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
+func SubmitFiles(master string, files []FilePart,
+ replication string, collection string, ttl string, maxMB int,
+ secret security.Secret,
+) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -54,7 +58,7 @@ func SubmitFiles(master string, files []FilePart, replication string, collection
file.Server = ret.PublicUrl
file.Replication = replication
file.Collection = collection
- results[index].Size, err = file.Upload(maxMB, master)
+ results[index].Size, err = file.Upload(maxMB, master, secret)
if err != nil {
results[index].Error = err.Error()
}
@@ -101,7 +105,8 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
+ jwt := security.GenJwt(secret, fi.Fid)
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -114,16 +119,20 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
chunks := fi.FileSize/chunkSize + 1
fids := make([]string, 0)
for i := int64(0); i < chunks; i++ {
- id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
+ id, count, e := upload_one_chunk(
+ fi.FileName+"-"+strconv.FormatInt(i+1, 10),
+ io.LimitReader(fi.Reader, chunkSize),
+ master, fi.Replication, fi.Collection, fi.Ttl,
+ jwt)
if e != nil {
return 0, e
}
fids = append(fids, id)
retSize += count
}
- err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids)
+ err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids, jwt)
} else {
- ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
+ ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
if e != nil {
return 0, e
}
@@ -132,24 +141,27 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
return
}
-func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
+func upload_one_chunk(filename string, reader io.Reader, master,
+ replication string, collection string, ttl string, jwt security.EncodedJwt,
+) (fid string, size uint32, e error) {
ret, err := Assign(master, 1, replication, collection, ttl)
if err != nil {
return "", 0, err
}
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
+ uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
+ "application/octet-stream", jwt)
if uploadError != nil {
return fid, 0, uploadError
}
return fid, uploadResult.Size, nil
}
-func upload_file_id_list(fileUrl, filename string, fids []string) error {
+func upload_file_id_list(fileUrl, filename string, fids []string, jwt security.EncodedJwt) error {
var buf bytes.Buffer
buf.WriteString(strings.Join(fids, "\n"))
glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
- _, e := Upload(fileUrl, filename, &buf, false, "text/plain")
+ _, e := Upload(fileUrl, filename, &buf, false, "text/plain", jwt)
return e
}
diff --git a/go/operation/upload_content.go b/go/operation/upload_content.go
index 480d76dca..533be82cb 100644
--- a/go/operation/upload_content.go
+++ b/go/operation/upload_content.go
@@ -15,6 +15,7 @@ import (
"strings"
"github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/security"
)
type UploadResult struct {
@@ -35,13 +36,13 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string) (*UploadResult, error) {
+func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
return upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = io.Copy(w, reader)
return
- }, filename, isGzipped, mtype)
+ }, filename, isGzipped, mtype, jwt)
}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string) (*UploadResult, error) {
+func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
h := make(textproto.MIMEHeader)
@@ -55,6 +56,9 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if isGzipped {
h.Set("Content-Encoding", "gzip")
}
+ if jwt != "" {
+ h.Set("Authorization", "BEARER "+string(jwt))
+ }
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
glog.V(0).Infoln("error creating form file", cp_err.Error())
diff --git a/go/security/guard.go b/go/security/guard.go
index a2beb48f4..d39985034 100644
--- a/go/security/guard.go
+++ b/go/security/guard.go
@@ -5,11 +5,8 @@ import (
"fmt"
"net"
"net/http"
- "strings"
- "time"
"github.com/chrislusf/weed-fs/go/glog"
- "github.com/dgrijalva/jwt-go"
)
var (
@@ -44,24 +41,24 @@ https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go
*/
type Guard struct {
whiteList []string
- secretKey string
+ SecretKey Secret
isActive bool
}
func NewGuard(whiteList []string, secretKey string) *Guard {
- g := &Guard{whiteList: whiteList, secretKey: secretKey}
- g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
+ g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)}
+ g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0
return g
}
-func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
+func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isActive {
//if no security needed, just skip all checkings
return f
}
return func(w http.ResponseWriter, r *http.Request) {
- if err := g.doCheck(w, r); err != nil {
+ if err := g.checkWhiteList(w, r); err != nil {
w.WriteHeader(http.StatusUnauthorized)
return
}
@@ -69,76 +66,62 @@ func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w ht
}
}
-func (g *Guard) NewToken() (tokenString string, err error) {
- m := make(map[string]interface{})
- m["exp"] = time.Now().Unix() + 10
- return g.Encode(m)
-}
-
-func (g *Guard) Encode(claims map[string]interface{}) (tokenString string, err error) {
+func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
if !g.isActive {
- return "", nil
+ //if no security needed, just skip all checkings
+ return f
+ }
+ return func(w http.ResponseWriter, r *http.Request) {
+ if err := g.checkJwt(w, r); err != nil {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ f(w, r)
}
-
- t := jwt.New(jwt.GetSigningMethod("HS256"))
- t.Claims = claims
- return t.SignedString(g.secretKey)
}
-func (g *Guard) Decode(tokenString string) (token *jwt.Token, err error) {
- return jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
- return g.secretKey, nil
- })
-}
+func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error {
+ if len(g.whiteList) == 0 {
+ return nil
+ }
-func (g *Guard) doCheck(w http.ResponseWriter, r *http.Request) error {
- if len(g.whiteList) != 0 {
- host, _, err := net.SplitHostPort(r.RemoteAddr)
- if err == nil {
- for _, ip := range g.whiteList {
- if ip == host {
- return nil
- }
+ host, _, err := net.SplitHostPort(r.RemoteAddr)
+ if err == nil {
+ for _, ip := range g.whiteList {
+ if ip == host {
+ return nil
}
}
}
- if len(g.secretKey) != 0 {
-
- // Get token from query params
- tokenStr := r.URL.Query().Get("jwt")
+ glog.V(1).Infof("Not in whitelist: %s", r.RemoteAddr)
+ return fmt.Errorf("Not in whitelis: %s", r.RemoteAddr)
+}
- // Get token from authorization header
- if tokenStr == "" {
- bearer := r.Header.Get("Authorization")
- if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
- tokenStr = bearer[7:]
- }
- }
+func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error {
+ if g.checkWhiteList(w, r) == nil {
+ return nil
+ }
- // Get token from cookie
- if tokenStr == "" {
- cookie, err := r.Cookie("jwt")
- if err == nil {
- tokenStr = cookie.Value
- }
- }
+ if len(g.SecretKey) == 0 {
+ return nil
+ }
- if tokenStr == "" {
- return ErrUnauthorized
- }
+ tokenStr := GetJwt(r)
- // Verify the token
- token, err := g.Decode(tokenStr)
- if err != nil {
- glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
- return ErrUnauthorized
- }
- if !token.Valid {
- glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
- return ErrUnauthorized
- }
+ if tokenStr == "" {
+ return ErrUnauthorized
+ }
+ // Verify the token
+ token, err := DecodeJwt(g.SecretKey, tokenStr)
+ if err != nil {
+ glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
+ return ErrUnauthorized
+ }
+ if !token.Valid {
+ glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
+ return ErrUnauthorized
}
glog.V(1).Infof("No permission from %s", r.RemoteAddr)
diff --git a/go/security/jwt.go b/go/security/jwt.go
new file mode 100644
index 000000000..fac91dd8e
--- /dev/null
+++ b/go/security/jwt.go
@@ -0,0 +1,72 @@
+package security
+
+import (
+ "net/http"
+ "strings"
+
+ "time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ jwt "github.com/dgrijalva/jwt-go"
+)
+
+type EncodedJwt string
+type Secret string
+
+func GenJwt(secret Secret, fileId string) EncodedJwt {
+ if secret == "" {
+ return ""
+ }
+
+ t := jwt.New(jwt.GetSigningMethod("HS256"))
+ t.Claims["exp"] = time.Now().Unix() + 10
+ t.Claims["sub"] = fileId
+ encoded, e := t.SignedString(secret)
+ if e != nil {
+ glog.V(0).Infof("Failed to sign claims: %v", t.Claims)
+ return ""
+ }
+ return EncodedJwt(encoded)
+}
+
+func GetJwt(r *http.Request) EncodedJwt {
+
+ // Get token from query params
+ tokenStr := r.URL.Query().Get("jwt")
+
+ // Get token from authorization header
+ if tokenStr == "" {
+ bearer := r.Header.Get("Authorization")
+ if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
+ tokenStr = bearer[7:]
+ }
+ }
+
+ // Get token from cookie
+ if tokenStr == "" {
+ cookie, err := r.Cookie("jwt")
+ if err == nil {
+ tokenStr = cookie.Value
+ }
+ }
+
+ return EncodedJwt(tokenStr)
+}
+
+func EncodeJwt(secret Secret, claims map[string]interface{}) (EncodedJwt, error) {
+ if secret == "" {
+ return "", nil
+ }
+
+ t := jwt.New(jwt.GetSigningMethod("HS256"))
+ t.Claims = claims
+ encoded, e := t.SignedString(secret)
+ return EncodedJwt(encoded), e
+}
+
+func DecodeJwt(secret Secret, tokenString EncodedJwt) (token *jwt.Token, err error) {
+ // check exp, nbf
+ return jwt.Parse(string(tokenString), func(token *jwt.Token) (interface{}, error) {
+ return secret, nil
+ })
+}
diff --git a/go/storage/store.go b/go/storage/store.go
index 2c4434b81..d280175f2 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/util"
"github.com/golang/protobuf/proto"
)
@@ -260,7 +261,7 @@ func (s *Store) SetRack(rack string) {
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster)
}
-func (s *Store) Join() (masterNode string, e error) {
+func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
masterNode, e = s.masterNodes.findMaster()
if e != nil {
return
@@ -314,22 +315,23 @@ func (s *Store) Join() (masterNode string, e error) {
data, err := proto.Marshal(joinMessage)
if err != nil {
- return "", err
+ return "", "", err
}
jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
if err != nil {
s.masterNodes.reset()
- return "", err
+ return "", "", err
}
var ret operation.JoinResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return masterNode, err
+ return masterNode, "", err
}
if ret.Error != "" {
- return masterNode, errors.New(ret.Error)
+ return masterNode, "", errors.New(ret.Error)
}
s.volumeSizeLimit = ret.VolumeSizeLimit
+ secretKey = security.Secret(ret.SecretKey)
s.connected = true
return
}
@@ -353,7 +355,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
}
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
- if _, e := s.Join(); e != nil {
+ if _, _, e := s.Join(); e != nil {
glog.V(0).Infoln("error when reporting size:", e)
}
}
diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go
index 0c52f9d30..da426e587 100644
--- a/go/topology/store_replicate.go
+++ b/go/topology/store_replicate.go
@@ -7,11 +7,18 @@ import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
)
-func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
+func ReplicatedWrite(masterNode string, s *storage.Store,
+ volumeId storage.VolumeId, needle *storage.Needle,
+ r *http.Request) (size uint32, errorStatus string) {
+
+ //check JWT
+ jwt := security.GetJwt(r)
+
ret, err := s.Write(volumeId, needle)
needToReplicate := !s.HasVolume(volumeId)
if err != nil {
@@ -27,7 +34,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
- _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
+ _, err := operation.Upload(
+ "http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
+ string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
+ jwt)
return err == nil
}) {
ret = 0
@@ -41,7 +51,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
volumeId.String() + ": " + err.Error()
} else {
distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
- return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
+ return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
})
}
}
@@ -49,7 +59,13 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
return
}
-func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
+func ReplicatedDelete(masterNode string, store *storage.Store,
+ volumeId storage.VolumeId, n *storage.Needle,
+ r *http.Request) (ret uint32) {
+
+ //check JWT
+ jwt := security.GetJwt(r)
+
ret, err := store.Delete(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
@@ -63,7 +79,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
- return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
+ return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}) {
ret = 0
}
diff --git a/go/util/constants.go b/go/util/constants.go
index 1aa59d634..ca801574c 100644
--- a/go/util/constants.go
+++ b/go/util/constants.go
@@ -1,5 +1,5 @@
package util
const (
- VERSION = "0.68"
+ VERSION = "0.69 beta"
)
diff --git a/go/util/http_util.go b/go/util/http_util.go
index 08de56ba9..72cab76e1 100644
--- a/go/util/http_util.go
+++ b/go/util/http_util.go
@@ -7,6 +7,8 @@ import (
"net/http"
"net/url"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/security"
)
var (
@@ -63,8 +65,11 @@ func Get(url string) ([]byte, error) {
return b, nil
}
-func Delete(url string) error {
+func Delete(url string, jwt security.EncodedJwt) error {
req, err := http.NewRequest("DELETE", url, nil)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
if err != nil {
return err
}
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index 04ab4307d..5a91d9d58 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -16,6 +16,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/util"
)
@@ -32,6 +33,7 @@ type BenchmarkOptions struct {
collection *string
cpuprofile *string
maxCpu *int
+ secretKey *string
vid2server map[string]string //cache for vid locations
}
@@ -56,6 +58,7 @@ func init() {
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
b.vid2server = make(map[string]string)
sharedBytes = make([]byte, 1024)
}
@@ -181,6 +184,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup
+ secret := security.Secret(*b.secretKey)
+
for i := 0; i < 7; i++ {
waitForDeletions.Add(1)
go func() {
@@ -189,7 +194,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now()))
}
- if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
+ if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid,
+ security.GenJwt(secret, df.fp.Fid)); e == nil {
s.completed++
} else {
s.failed++
@@ -204,7 +210,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
- if _, err := fp.Upload(0, *b.server); err == nil {
+ if _, err := fp.Upload(0, *b.server, secret); err == nil {
if rand.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
diff --git a/go/weed/filer.go b/go/weed/filer.go
index 4e7191e34..fd7dcdf88 100644
--- a/go/weed/filer.go
+++ b/go/weed/filer.go
@@ -22,6 +22,7 @@ type FilerOptions struct {
defaultReplicaPlacement *string
dir *string
redirectOnRead *bool
+ secretKey *string
cassandra_server *string
cassandra_keyspace *string
redis_server *string
@@ -40,6 +41,8 @@ func init() {
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
+ f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
+
}
var cmdFiler = &Command{
@@ -73,6 +76,7 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead,
+ *f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace,
*f.redis_server, *f.redis_database,
)
diff --git a/go/weed/master.go b/go/weed/master.go
index af63d8c22..1e2a6f0af 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -29,8 +29,9 @@ var cmdMaster = &Command{
var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
- masterIp = cmdMaster.Flag.String("ip", "", "master listening ip address, default to listen on all network interfaces")
+ masterIp = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address")
masterBindIp = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
+ mPublicUrl = cmdMaster.Flag.String("publicUrl", "", "peer accessible <ip>|<server_name>:port")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
@@ -41,7 +42,7 @@ var (
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
- masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission")
+ masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
masterWhiteList []string
)
@@ -75,10 +76,10 @@ func runMaster(cmd *Command, args []string) bool {
go func() {
time.Sleep(100 * time.Millisecond)
- if *masterIp == "" {
- *masterIp = "localhost"
- }
myMasterAddress := *masterIp + ":" + strconv.Itoa(*mport)
+ if *mPublicUrl != "" {
+ myMasterAddress = *mPublicUrl
+ }
var peers []string
if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",")
diff --git a/go/weed/mount_std.go b/go/weed/mount_std.go
index 808c6c563..8b5ffefcb 100644
--- a/go/weed/mount_std.go
+++ b/go/weed/mount_std.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
+ "golang.org/x/net/context"
)
func runMount(cmd *Command, args []string) bool {
@@ -55,7 +56,7 @@ type File struct {
func (File) Attr() fuse.Attr {
return fuse.Attr{Mode: 0444}
}
-func (File) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
+func (File) ReadAll(ctx context.Context) ([]byte, error) {
return []byte("hello, world\n"), nil
}
@@ -68,7 +69,7 @@ func (dir Dir) Attr() fuse.Attr {
return fuse.Attr{Inode: dir.Id, Mode: os.ModeDir | 0555}
}
-func (dir Dir) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
+func (dir Dir) Lookup(ctx context.Context, name string) (fs.Node, error) {
files_result, e := filer.ListFiles(*mountOptions.filer, dir.Path, name)
if e != nil {
return nil, fuse.ENOENT
@@ -81,11 +82,11 @@ func (dir Dir) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
type WFS struct{}
-func (WFS) Root() (fs.Node, fuse.Error) {
+func (WFS) Root() (fs.Node, error) {
return Dir{}, nil
}
-func (dir *Dir) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
+func (dir *Dir) ReadDir(ctx context.Context) ([]fuse.Dirent, error) {
ret := make([]fuse.Dirent, 0)
if dirs, e := filer.ListDirectories(*mountOptions.filer, dir.Path); e == nil {
for _, d := range dirs.Directories {
diff --git a/go/weed/server.go b/go/weed/server.go
index b779033cb..a758f887f 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -47,7 +47,7 @@ var cmdServer = &Command{
}
var (
- serverIp = cmdServer.Flag.String("ip", "", "ip or server name")
+ serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
serverPublicUrl = cmdServer.Flag.String("publicUrl", "", "publicly accessible address")
serverBindIp = cmdServer.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
@@ -56,7 +56,7 @@ var (
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list")
- serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access")
+ serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
@@ -86,10 +86,10 @@ func init() {
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server")
-
}
func runServer(cmd *Command, args []string) bool {
+ filerOptions.secretKey = serverSecureKey
if *serverOptions.cpuprofile != "" {
f, err := os.Create(*serverOptions.cpuprofile)
if err != nil {
@@ -99,10 +99,6 @@ func runServer(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- if *serverIp == "" {
- *serverIp = "localhost"
- }
-
if *filerOptions.redirectOnRead {
*isStartingFiler = true
}
@@ -145,13 +141,13 @@ func runServer(cmd *Command, args []string) bool {
*filerOptions.dir = *masterMetaFolder + "/filer"
os.MkdirAll(*filerOptions.dir, 0700)
}
+ if err := util.TestFolderWritable(*filerOptions.dir); err != nil {
+ glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err)
+ }
}
if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir=\"%s\") Writable: %s", *masterMetaFolder, err)
}
- if err := util.TestFolderWritable(*filerOptions.dir); err != nil {
- glog.Fatalf("Check Mapping Meta Folder (-filer.dir=\"%s\") Writable: %s", *filerOptions.dir, err)
- }
if *serverWhiteListOption != "" {
serverWhiteList = strings.Split(*serverWhiteListOption, ",")
@@ -162,6 +158,7 @@ func runServer(cmd *Command, args []string) bool {
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
+ *filerOptions.secretKey,
"", "",
"", 0,
)
diff --git a/go/weed/upload.go b/go/weed/upload.go
index 2d67c0bd9..eff259d1f 100644
--- a/go/weed/upload.go
+++ b/go/weed/upload.go
@@ -7,6 +7,7 @@ import (
"path/filepath"
"github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/security"
)
var (
@@ -15,6 +16,7 @@ var (
uploadDir *string
uploadTtl *string
include *string
+ uploadSecretKey *string
maxMB *int
)
@@ -28,13 +30,14 @@ func init() {
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
+ uploadSecretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
}
var cmdUpload = &Command{
UsageLine: "upload -server=localhost:9333 file1 [file2 file3]\n upload -server=localhost:9333 -dir=one_directory -include=*.pdf",
Short: "upload one or a list of files",
Long: `upload one or a list of files, or batch upload one whole folder recursively.
-
+
If uploading a list of files:
It uses consecutive file keys for the list of files.
e.g. If the file1 uses key k, file2 can be read via k_1
@@ -42,18 +45,19 @@ var cmdUpload = &Command{
If uploading a whole folder recursively:
All files under the folder and subfolders will be uploaded, each with its own file key.
Optional parameter "-include" allows you to specify the file name patterns.
-
+
If any file has a ".gz" extension, the content are considered gzipped already, and will be stored as is.
This can save volume server's gzipped processing and allow customizable gzip compression level.
The file name will strip out ".gz" and stored. For example, "jquery.js.gz" will be stored as "jquery.js".
-
- If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly.
- The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
+
+ If "maxMB" is set to a positive number, files larger than it would be split into chunks and uploaded separatedly.
+ The list of file ids of those chunks would be stored in an additional chunk, and this additional chunk's file id would be returned.
`,
}
func runUpload(cmd *Command, args []string) bool {
+ secret := security.Secret(*uploadSecretKey)
if len(cmdUpload.Flag.Args()) == 0 {
if *uploadDir == "" {
return false
@@ -70,7 +74,9 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
+ results, e := operation.SubmitFiles(*server, parts,
+ *uploadReplication, *uploadCollection,
+ *uploadTtl, *maxMB, secret)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -87,7 +93,9 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
fmt.Println(e.Error())
}
- results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
+ results, _ := operation.SubmitFiles(*server, parts,
+ *uploadReplication, *uploadCollection,
+ *uploadTtl, *maxMB, secret)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
}
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go
index a2d93c246..095652a6b 100644
--- a/go/weed/weed_server/common.go
+++ b/go/weed/weed_server/common.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/security"
"github.com/chrislusf/weed-fs/go/stats"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
@@ -75,6 +76,7 @@ func debug(params ...interface{}) {
}
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
+ jwt := security.GetJwt(r)
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -102,7 +104,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("upload file to store", url)
- uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType)
+ uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/go/weed/weed_server/filer_server.go b/go/weed/weed_server/filer_server.go
index b43e1965b..1309e4486 100644
--- a/go/weed/weed_server/filer_server.go
+++ b/go/weed/weed_server/filer_server.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/weed-fs/go/filer/flat_namespace"
"github.com/chrislusf/weed-fs/go/filer/redis_store"
"github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/security"
)
type FilerServer struct {
@@ -18,11 +19,13 @@ type FilerServer struct {
collection string
defaultReplication string
redirectOnRead bool
+ secret security.Secret
filer filer.Filer
}
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
replication string, redirectOnRead bool,
+ secret string,
cassandra_server string, cassandra_keyspace string,
redis_server string, redis_database int,
) (fs *FilerServer, err error) {
@@ -56,3 +59,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
return fs, nil
}
+
+func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
+ return security.GenJwt(fs.secret, fileId)
+}
diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go
index ac894771a..6278e5dad 100644
--- a/go/weed/weed_server/filer_server_handlers.go
+++ b/go/weed/weed_server/filer_server_handlers.go
@@ -170,7 +170,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if ret.Name != "" {
path += ret.Name
} else {
- operation.DeleteFile(fs.master, assignResult.Fid) //clean up
+ operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
writeJsonError(w, r, http.StatusInternalServerError,
errors.New("Can not to write to folder "+path+" without a file name"))
@@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
- operation.DeleteFile(fs.master, assignResult.Fid) //clean up
+ operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err)
return
@@ -199,7 +199,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} else {
fid, err = fs.filer.DeleteFile(r.URL.Path)
if err == nil {
- err = operation.DeleteFile(fs.master, fid)
+ err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
}
}
if err == nil {
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 056b1fe7b..dc79c733a 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -23,6 +23,7 @@ type MasterServer struct {
pulseSeconds int
defaultReplicaPlacement string
garbageThreshold string
+ guard *security.Guard
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -57,22 +58,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
- guard := security.NewGuard(whiteList, secureKey)
+ ms.guard = security.NewGuard(whiteList, secureKey)
- r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler)))
- r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler)))
- r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler)))
- r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler)))
- r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler)))
- r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler)))
- r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler)))
- r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler)))
- r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler))
- r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler))
+ r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
+ r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler)))
+ r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
+ r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler)))
+ r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
+ r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
+ r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
+ r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
+ r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
r.HandleFunc("/{fileId}", ms.redirectHandler)
- r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler))
- r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index 33e45afd2..9d9880a6a 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -58,7 +58,10 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
ms.Topo.ProcessJoinMessage(joinMessage)
- writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
+ writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
+ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
+ SecretKey: string(ms.guard.SecretKey),
+ })
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index f3ad2974d..177514920 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"math/rand"
"net/http"
+ "sync"
"time"
"github.com/chrislusf/weed-fs/go/glog"
@@ -12,6 +13,7 @@ import (
type VolumeServer struct {
masterNode string
+ mnLock sync.RWMutex
pulseSeconds int
dataCenter string
rack string
@@ -29,40 +31,42 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
whiteList []string,
fixJpgOrientation bool) *VolumeServer {
vs := &VolumeServer{
- masterNode: masterNode,
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
FixJpgOrientation: fixJpgOrientation,
}
+ vs.SetMasterNode(masterNode)
vs.store = storage.NewStore(port, adminPort, ip, publicUrl, folders, maxCounts)
vs.guard = security.NewGuard(whiteList, "")
- adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
- adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
- adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
- adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
- adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
- adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
- adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
- adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
- adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
- adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
+ adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
+ adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler))
+ adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
+ adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
+ adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
+ adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler))
+ adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
+ adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
+ adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
+ adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
publicMux.HandleFunc("/", vs.storeHandler)
go func() {
connected := true
- vs.store.SetBootstrapMaster(vs.masterNode)
+
+ vs.store.SetBootstrapMaster(vs.GetMasterNode())
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
for {
- master, err := vs.store.Join()
+ master, secretKey, err := vs.store.Join()
if err == nil {
if !connected {
connected = true
- vs.masterNode = master
+ vs.SetMasterNode(master)
+ vs.guard.SecretKey = secretKey
glog.V(0).Infoln("Volume Server Connected with master at", master)
}
} else {
@@ -82,8 +86,24 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
return vs
}
+func (vs *VolumeServer) GetMasterNode() string {
+ vs.mnLock.RLock()
+ defer vs.mnLock.RUnlock()
+ return vs.masterNode
+}
+
+func (vs *VolumeServer) SetMasterNode(masterNode string) {
+ vs.mnLock.Lock()
+ defer vs.mnLock.Unlock()
+ vs.masterNode = masterNode
+}
+
func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close()
glog.V(0).Infoln("Shut down successfully!")
}
+
+func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
+ return security.GenJwt(vs.guard.SecretKey, fileId)
+}
diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go
index c2c9e8523..d3fdf0cb2 100644
--- a/go/weed/weed_server/volume_server_handlers.go
+++ b/go/weed/weed_server/volume_server_handlers.go
@@ -58,7 +58,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
glog.V(4).Infoln("volume", volumeId, "reading", n)
if !vs.store.HasVolume(volumeId) {
- lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
+ lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, "http://"+lookupResult.Locations[0].Url+r.URL.Path, http.StatusMovedPermanently)
@@ -253,7 +253,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret := operation.UploadResult{}
- size, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
+ size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(),
+ vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated
if errorStatus != "" {
httpStatus = http.StatusInternalServerError
@@ -290,7 +291,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
n.Size = 0
- ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
+ ret := topology.ReplicatedDelete(vs.GetMasterNode(), vs.store, volumeId, n, r)
if ret != 0 {
m := make(map[string]uint32)