aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2018-07-21 20:14:38 -0700
committerGitHub <noreply@github.com>2018-07-21 20:14:38 -0700
commit3423c1da18487e4dc3d77a024f9c0d5d3b7599cf (patch)
treecc72caa73fadbdb81659c1f13bb87f33c502fbc1 /weed/server
parentc98df05ed0fc78e8585c6dd7d2ae317c7c42d9c3 (diff)
parent49375d603177e4134d0cb4128324a2dd70521290 (diff)
downloadseaweedfs-3423c1da18487e4dc3d77a024f9c0d5d3b7599cf.tar.xz
seaweedfs-3423c1da18487e4dc3d77a024f9c0d5d3b7599cf.zip
Merge pull request #693 from chrislusf/add_s3
Add "weed s3" to support S3 API
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go12
-rw-r--r--weed/server/filer_server.go4
-rw-r--r--weed/server/filer_server_handlers_read.go4
-rw-r--r--weed/server/filer_server_handlers_write.go70
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go15
-rw-r--r--weed/server/filer_server_handlers_write_monopart.go139
-rw-r--r--weed/server/filer_server_handlers_write_multipart.go39
-rw-r--r--weed/server/raft_server_handlers.go2
-rw-r--r--weed/server/volume_grpc_client.go2
-rw-r--r--weed/server/volume_server.go2
-rw-r--r--weed/server/volume_server_handlers_sync.go2
-rw-r--r--weed/server/volume_server_handlers_write.go4
12 files changed, 67 insertions, 228 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 1ec5439a5..830d8ebe1 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"strconv"
)
@@ -162,7 +163,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
- err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsDeleteData)
+ err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsRecursive, req.IsDeleteData)
return &filer_pb.DeleteEntryResponse{}, err
}
@@ -211,3 +212,12 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
PublicUrl: assignResult.PublicUrl,
}, err
}
+
+func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
+
+ for _, master := range fs.option.Masters {
+ _, err = util.Get(fmt.Sprintf("http://%s/col/delete?collection=%s", master, req.Collection))
+ }
+
+ return &filer_pb.DeleteCollectionResponse{}, err
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 601790f8a..61ca972cc 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "net/http"
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
@@ -9,8 +8,9 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "net/http"
)
type FilerOption struct {
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index dbd91c5e0..77374147a 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -10,10 +10,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/util"
- "strconv"
- "mime/multipart"
"mime"
+ "mime/multipart"
"path"
+ "strconv"
)
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index ba7c17b79..52be6735c 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -15,6 +15,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "os"
+)
+
+var (
+ OS_UID = uint32(os.Getuid())
+ OS_GID = uint32(os.Getgid())
)
type FilerPostResult struct {
@@ -27,9 +33,20 @@ type FilerPostResult struct {
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
var entry *filer2.Entry
- if entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil {
+ entry, err = fs.filer.FindEntry(filer2.FullPath(path))
+ if err == filer2.ErrNotFound {
+ return "", "", nil
+ }
+
+ if err != nil {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ if len(entry.Chunks) == 0 {
+ glog.V(1).Infof("empty entry: %s", path)
+ w.WriteHeader(http.StatusNoContent)
} else {
fileId = entry.Chunks[0].FileId
urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId)
@@ -59,9 +76,10 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
DataCenter: "",
}
}
+
assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
if ae != nil {
- glog.V(0).Infoln("failing to assign a file id", ae.Error())
+ glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae)
err = ae
return
@@ -91,21 +109,16 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- var fileId, urlLocation string
- var err error
-
- if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
- fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
- if err != nil {
- return
- }
- } else {
- fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
- if err != nil {
- return
- }
+ fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path)
+ if err == nil && fileId == "" {
+ fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+ }
+ if err != nil || fileId == "" || urlLocation == "" {
+ return
}
+ glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation)
+
u, _ := url.Parse(urlLocation)
// This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
@@ -118,6 +131,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
glog.V(4).Infoln("post to", u)
+ // send request to volume server
request := &http.Request{
Method: r.Method,
URL: u,
@@ -131,7 +145,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
resp, do_err := util.Do(request)
if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
+ glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method)
writeJsonError(w, r, http.StatusInternalServerError, do_err)
return
}
@@ -155,6 +169,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
return
}
+
+ // find correct final path
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if ret.Name != "" {
@@ -168,16 +184,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
}
- // also delete the old fid unless PUT operation
- if r.Method != "PUT" {
- if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
- oldFid := entry.Chunks[0].FileId
- operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid))
- } else if err != nil && err != filer2.ErrNotFound {
- glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
- }
- }
-
+ // update metadata in filer store
glog.V(4).Infoln("saving", path, "=>", fileId)
entry := &filer2.Entry{
FullPath: filer2.FullPath(path),
@@ -185,13 +192,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
Mtime: time.Now(),
Crtime: time.Now(),
Mode: 0660,
+ Uid: OS_UID,
+ Gid: OS_GID,
Replication: replication,
Collection: collection,
TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
},
Chunks: []*filer_pb.FileChunk{{
FileId: fileId,
- Size: uint64(r.ContentLength),
+ Size: uint64(ret.Size),
Mtime: time.Now().UnixNano(),
}},
}
@@ -202,6 +211,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
+ // send back post result
reply := FilerPostResult{
Name: ret.Name,
Size: ret.Size,
@@ -215,12 +225,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// curl -X DELETE http://localhost:8888/path/to
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), true)
+ err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), false, true)
if err != nil {
- glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
+ glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
+ w.WriteHeader(http.StatusNoContent)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index f87e7d65a..4b1745aaa 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -7,6 +7,7 @@ import (
"net/http"
"path"
"strconv"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer2"
@@ -143,15 +144,9 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
}
path := r.URL.Path
- // also delete the old fid unless PUT operation
- if r.Method != "PUT" {
- if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil {
- for _, chunk := range entry.Chunks {
- oldFid := chunk.FileId
- operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid))
- }
- } else if err != nil {
- glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
+ if strings.HasSuffix(path, "/") {
+ if fileName != "" {
+ path += fileName
}
}
@@ -162,6 +157,8 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
Mtime: time.Now(),
Crtime: time.Now(),
Mode: 0660,
+ Uid: OS_UID,
+ Gid: OS_GID,
Replication: replication,
Collection: collection,
TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
diff --git a/weed/server/filer_server_handlers_write_monopart.go b/weed/server/filer_server_handlers_write_monopart.go
deleted file mode 100644
index 777d5bc43..000000000
--- a/weed/server/filer_server_handlers_write_monopart.go
+++ /dev/null
@@ -1,139 +0,0 @@
-package weed_server
-
-import (
- "bytes"
- "crypto/md5"
- "encoding/base64"
- "fmt"
- "io"
- "io/ioutil"
- "mime/multipart"
- "net/http"
- "net/textproto"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
-)
-
-var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"")
-
-func escapeQuotes(s string) string {
- return quoteEscaper.Replace(s)
-}
-
-func createFormFile(writer *multipart.Writer, fieldname, filename, mime string) (io.Writer, error) {
- h := make(textproto.MIMEHeader)
- h.Set("Content-Disposition",
- fmt.Sprintf(`form-data; name="%s"; filename="%s"`,
- escapeQuotes(fieldname), escapeQuotes(filename)))
- if len(mime) == 0 {
- mime = "application/octet-stream"
- }
- h.Set("Content-Type", mime)
- return writer.CreatePart(h)
-}
-
-func makeFormData(filename, mimeType string, content io.Reader) (formData io.Reader, contentType string, err error) {
- buf := new(bytes.Buffer)
- writer := multipart.NewWriter(buf)
- defer writer.Close()
-
- part, err := createFormFile(writer, "file", filename, mimeType)
- if err != nil {
- glog.V(0).Infoln(err)
- return
- }
- _, err = io.Copy(part, content)
- if err != nil {
- glog.V(0).Infoln(err)
- return
- }
-
- formData = buf
- contentType = writer.FormDataContentType()
-
- return
-}
-
-func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) {
- if contentMD5 := r.Header.Get("Content-MD5"); contentMD5 != "" {
- buf, _ := ioutil.ReadAll(r.Body)
- //checkMD5
- sum := md5.Sum(buf)
- fileDataMD5 := base64.StdEncoding.EncodeToString(sum[0:len(sum)])
- if strings.ToLower(fileDataMD5) != strings.ToLower(contentMD5) {
- glog.V(0).Infof("fileDataMD5 [%s] is not equal to Content-MD5 [%s]", fileDataMD5, contentMD5)
- err = fmt.Errorf("MD5 check failed")
- writeJsonError(w, r, http.StatusNotAcceptable, err)
- return
- }
- //reconstruct http request body for following new request to volume server
- r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
- }
- return
-}
-
-func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
- /*
- Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
- There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
- a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API
- 1. The request url format should be http://$host:$port/$bucketName/$objectName
- 2. bucketName will be mapped to seaweedfs's collection name
- 3. You could customize and make your enhancement.
- */
- lastPos := strings.LastIndex(r.URL.Path, "/")
- if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 {
- glog.V(0).Infof("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path)
- err = fmt.Errorf("URL Path is invalid")
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
-
- if err = checkContentMD5(w, r); err != nil {
- return
- }
-
- fileName := r.URL.Path[lastPos+1:]
- if err = multipartHttpBodyBuilder(w, r, fileName); err != nil {
- return
- }
-
- secondPos := strings.Index(r.URL.Path[1:], "/") + 1
- collection = r.URL.Path[1:secondPos]
- path := r.URL.Path
-
- if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" {
- fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
- }
- return
-}
-
-func multipartHttpBodyBuilder(w http.ResponseWriter, r *http.Request, fileName string) (err error) {
- body, contentType, te := makeFormData(fileName, r.Header.Get("Content-Type"), r.Body)
- if te != nil {
- glog.V(0).Infoln("S3 protocol to raw seaweed protocol failed", te.Error())
- writeJsonError(w, r, http.StatusInternalServerError, te)
- err = te
- return
- }
-
- if body != nil {
- switch v := body.(type) {
- case *bytes.Buffer:
- r.ContentLength = int64(v.Len())
- case *bytes.Reader:
- r.ContentLength = int64(v.Len())
- case *strings.Reader:
- r.ContentLength = int64(v.Len())
- }
- }
-
- r.Header.Set("Content-Type", contentType)
- rc, ok := body.(io.ReadCloser)
- if !ok && body != nil {
- rc = ioutil.NopCloser(body)
- }
- r.Body = rc
- return
-}
diff --git a/weed/server/filer_server_handlers_write_multipart.go b/weed/server/filer_server_handlers_write_multipart.go
deleted file mode 100644
index 056317750..000000000
--- a/weed/server/filer_server_handlers_write_multipart.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package weed_server
-
-import (
- "bytes"
- "io/ioutil"
- "net/http"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
-)
-
-func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
- //Default handle way for http multipart
- if r.Method == "PUT" {
- buf, _ := ioutil.ReadAll(r.Body)
- r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
- fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
- if pe != nil {
- glog.V(0).Infoln("failing to parse post body", pe.Error())
- writeJsonError(w, r, http.StatusInternalServerError, pe)
- err = pe
- return
- }
- //reconstruct http request body for following new request to volume server
- r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
-
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if fileName != "" {
- path += fileName
- }
- }
- fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path)
- } else {
- fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
- }
- return
-}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index c91ab0407..627fe354e 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,8 +1,8 @@
package weed_server
import (
- "net/http"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "net/http"
)
func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go
index b3c755239..de6fa23c7 100644
--- a/weed/server/volume_grpc_client.go
+++ b/weed/server/volume_grpc_client.go
@@ -7,8 +7,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "golang.org/x/net/context"
"github.com/chrislusf/seaweedfs/weed/util"
+ "golang.org/x/net/context"
)
func (vs *VolumeServer) GetMaster() string {
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 9294f9bf6..037fca2c2 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,10 +1,10 @@
package weed_server
import (
- "net/http"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "net/http"
)
type VolumeServer struct {
diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go
index 38adfe870..c6e32bb9b 100644
--- a/weed/server/volume_server_handlers_sync.go
+++ b/weed/server/volume_server_handlers_sync.go
@@ -6,8 +6,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) {
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 55ef2a613..d32958339 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -9,8 +9,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
- "time"
"strconv"
+ "time"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -55,7 +55,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := storage.NewVolumeId(vid)
n.ParsePath(fid)
- glog.V(2).Infoln("deleting", n)
+ glog.V(2).Infof("volume %s deleting %s", vid, n)
cookie := n.Cookie