diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2018-07-21 20:14:38 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-07-21 20:14:38 -0700 |
| commit | 3423c1da18487e4dc3d77a024f9c0d5d3b7599cf (patch) | |
| tree | cc72caa73fadbdb81659c1f13bb87f33c502fbc1 /weed/server | |
| parent | c98df05ed0fc78e8585c6dd7d2ae317c7c42d9c3 (diff) | |
| parent | 49375d603177e4134d0cb4128324a2dd70521290 (diff) | |
| download | seaweedfs-3423c1da18487e4dc3d77a024f9c0d5d3b7599cf.tar.xz seaweedfs-3423c1da18487e4dc3d77a024f9c0d5d3b7599cf.zip | |
Merge pull request #693 from chrislusf/add_s3
Add "weed s3" to support S3 API
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 70 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 15 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_monopart.go | 139 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_multipart.go | 39 | ||||
| -rw-r--r-- | weed/server/raft_server_handlers.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_sync.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 4 |
12 files changed, 67 insertions, 228 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 1ec5439a5..830d8ebe1 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "strconv" ) @@ -162,7 +163,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { - err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsDeleteData) + err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsRecursive, req.IsDeleteData) return &filer_pb.DeleteEntryResponse{}, err } @@ -211,3 +212,12 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol PublicUrl: assignResult.PublicUrl, }, err } + +func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { + + for _, master := range fs.option.Masters { + _, err = util.Get(fmt.Sprintf("http://%s/col/delete?collection=%s", master, req.Collection)) + } + + return &filer_pb.DeleteCollectionResponse{}, err +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 601790f8a..61ca972cc 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "net/http" "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" @@ -9,8 +8,9 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" + "net/http" ) type FilerOption struct { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index dbd91c5e0..77374147a 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -10,10 +10,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/util" - "strconv" - "mime/multipart" "mime" + "mime/multipart" "path" + "strconv" ) func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index ba7c17b79..52be6735c 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -15,6 +15,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "os" +) + +var ( + OS_UID = uint32(os.Getuid()) + OS_GID = uint32(os.Getgid()) ) type FilerPostResult struct { @@ -27,9 +33,20 @@ type FilerPostResult struct { func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) { var entry *filer2.Entry - if entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil { + entry, err = fs.filer.FindEntry(filer2.FullPath(path)) + if err == filer2.ErrNotFound { + return "", "", nil + } + + if err != nil { glog.V(0).Infoln("failing to find path in filer store", path, err.Error()) writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + if len(entry.Chunks) == 0 { + glog.V(1).Infof("empty entry: %s", path) + w.WriteHeader(http.StatusNoContent) } else { fileId = entry.Chunks[0].FileId urlLocation, err = operation.LookupFileId(fs.filer.GetMaster(), fileId) @@ -59,9 +76,10 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, DataCenter: "", } } + assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest) if ae != nil { - glog.V(0).Infoln("failing to assign a file id", ae.Error()) + glog.Errorf("failing to assign a file id: %v", ae) writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return @@ -91,21 +109,16 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - var fileId, urlLocation string - var err error - - if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") { - fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter) - if err != nil { - return - } - } else { - fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter) - if err != nil { - return - } + fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path) + if err == nil && fileId == "" { + fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + } + if err != nil || fileId == "" || urlLocation == "" { + return } + glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation) + u, _ := url.Parse(urlLocation) // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off @@ -118,6 +131,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } glog.V(4).Infoln("post to", u) + // send request to volume server request := &http.Request{ Method: r.Method, URL: u, @@ -131,7 +145,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } resp, do_err := util.Do(request) if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) + glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method) writeJsonError(w, r, http.StatusInternalServerError, do_err) return } @@ -155,6 +169,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) return } + + // find correct final path path := r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { @@ -168,16 +184,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } } - // also delete the old fid unless PUT operation - if r.Method != "PUT" { - if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { - oldFid := entry.Chunks[0].FileId - operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid)) - } else if err != nil && err != filer2.ErrNotFound { - glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) - } - } - + // update metadata in filer store glog.V(4).Infoln("saving", path, "=>", fileId) entry := &filer2.Entry{ FullPath: filer2.FullPath(path), @@ -185,13 +192,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { Mtime: time.Now(), Crtime: time.Now(), Mode: 0660, + Uid: OS_UID, + Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), }, Chunks: []*filer_pb.FileChunk{{ FileId: fileId, - Size: uint64(r.ContentLength), + Size: uint64(ret.Size), Mtime: time.Now().UnixNano(), }}, } @@ -202,6 +211,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } + // send back post result reply := FilerPostResult{ Name: ret.Name, Size: ret.Size, @@ -215,12 +225,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { // curl -X DELETE http://localhost:8888/path/to func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), true) + err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), false, true) if err != nil { - glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error()) + glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) writeJsonError(w, r, http.StatusInternalServerError, err) return } - writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""}) + w.WriteHeader(http.StatusNoContent) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index f87e7d65a..4b1745aaa 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -7,6 +7,7 @@ import ( "net/http" "path" "strconv" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/filer2" @@ -143,15 +144,9 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte } path := r.URL.Path - // also delete the old fid unless PUT operation - if r.Method != "PUT" { - if entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil { - for _, chunk := range entry.Chunks { - oldFid := chunk.FileId - operation.DeleteFile(fs.filer.GetMaster(), oldFid, fs.jwt(oldFid)) - } - } else if err != nil { - glog.V(0).Infof("error %v occur when finding %s in filer store", err, path) + if strings.HasSuffix(path, "/") { + if fileName != "" { + path += fileName } } @@ -162,6 +157,8 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte Mtime: time.Now(), Crtime: time.Now(), Mode: 0660, + Uid: OS_UID, + Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), diff --git a/weed/server/filer_server_handlers_write_monopart.go b/weed/server/filer_server_handlers_write_monopart.go deleted file mode 100644 index 777d5bc43..000000000 --- a/weed/server/filer_server_handlers_write_monopart.go +++ /dev/null @@ -1,139 +0,0 @@ -package weed_server - -import ( - "bytes" - "crypto/md5" - "encoding/base64" - "fmt" - "io" - "io/ioutil" - "mime/multipart" - "net/http" - "net/textproto" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" -) - -var quoteEscaper = strings.NewReplacer("\\", "\\\\", `"`, "\\\"") - -func escapeQuotes(s string) string { - return quoteEscaper.Replace(s) -} - -func createFormFile(writer *multipart.Writer, fieldname, filename, mime string) (io.Writer, error) { - h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", - fmt.Sprintf(`form-data; name="%s"; filename="%s"`, - escapeQuotes(fieldname), escapeQuotes(filename))) - if len(mime) == 0 { - mime = "application/octet-stream" - } - h.Set("Content-Type", mime) - return writer.CreatePart(h) -} - -func makeFormData(filename, mimeType string, content io.Reader) (formData io.Reader, contentType string, err error) { - buf := new(bytes.Buffer) - writer := multipart.NewWriter(buf) - defer writer.Close() - - part, err := createFormFile(writer, "file", filename, mimeType) - if err != nil { - glog.V(0).Infoln(err) - return - } - _, err = io.Copy(part, content) - if err != nil { - glog.V(0).Infoln(err) - return - } - - formData = buf - contentType = writer.FormDataContentType() - - return -} - -func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) { - if contentMD5 := r.Header.Get("Content-MD5"); contentMD5 != "" { - buf, _ := ioutil.ReadAll(r.Body) - //checkMD5 - sum := md5.Sum(buf) - fileDataMD5 := base64.StdEncoding.EncodeToString(sum[0:len(sum)]) - if strings.ToLower(fileDataMD5) != strings.ToLower(contentMD5) { - glog.V(0).Infof("fileDataMD5 [%s] is not equal to Content-MD5 [%s]", fileDataMD5, contentMD5) - err = fmt.Errorf("MD5 check failed") - writeJsonError(w, r, http.StatusNotAcceptable, err) - return - } - //reconstruct http request body for following new request to volume server - r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - } - return -} - -func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) { - /* - Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html] - There is a long way to provide a completely compatibility against all Amazon S3 API, I just made - a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API - 1. The request url format should be http://$host:$port/$bucketName/$objectName - 2. bucketName will be mapped to seaweedfs's collection name - 3. You could customize and make your enhancement. - */ - lastPos := strings.LastIndex(r.URL.Path, "/") - if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 { - glog.V(0).Infof("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path) - err = fmt.Errorf("URL Path is invalid") - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - - if err = checkContentMD5(w, r); err != nil { - return - } - - fileName := r.URL.Path[lastPos+1:] - if err = multipartHttpBodyBuilder(w, r, fileName); err != nil { - return - } - - secondPos := strings.Index(r.URL.Path[1:], "/") + 1 - collection = r.URL.Path[1:secondPos] - path := r.URL.Path - - if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" { - fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) - } - return -} - -func multipartHttpBodyBuilder(w http.ResponseWriter, r *http.Request, fileName string) (err error) { - body, contentType, te := makeFormData(fileName, r.Header.Get("Content-Type"), r.Body) - if te != nil { - glog.V(0).Infoln("S3 protocol to raw seaweed protocol failed", te.Error()) - writeJsonError(w, r, http.StatusInternalServerError, te) - err = te - return - } - - if body != nil { - switch v := body.(type) { - case *bytes.Buffer: - r.ContentLength = int64(v.Len()) - case *bytes.Reader: - r.ContentLength = int64(v.Len()) - case *strings.Reader: - r.ContentLength = int64(v.Len()) - } - } - - r.Header.Set("Content-Type", contentType) - rc, ok := body.(io.ReadCloser) - if !ok && body != nil { - rc = ioutil.NopCloser(body) - } - r.Body = rc - return -} diff --git a/weed/server/filer_server_handlers_write_multipart.go b/weed/server/filer_server_handlers_write_multipart.go deleted file mode 100644 index 056317750..000000000 --- a/weed/server/filer_server_handlers_write_multipart.go +++ /dev/null @@ -1,39 +0,0 @@ -package weed_server - -import ( - "bytes" - "io/ioutil" - "net/http" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" -) - -func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) { - //Default handle way for http multipart - if r.Method == "PUT" { - buf, _ := ioutil.ReadAll(r.Body) - r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r) - if pe != nil { - glog.V(0).Infoln("failing to parse post body", pe.Error()) - writeJsonError(w, r, http.StatusInternalServerError, pe) - err = pe - return - } - //reconstruct http request body for following new request to volume server - r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if fileName != "" { - path += fileName - } - } - fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path) - } else { - fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) - } - return -} diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index c91ab0407..627fe354e 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,8 +1,8 @@ package weed_server import ( - "net/http" "github.com/chrislusf/seaweedfs/weed/operation" + "net/http" ) func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go index b3c755239..de6fa23c7 100644 --- a/weed/server/volume_grpc_client.go +++ b/weed/server/volume_grpc_client.go @@ -7,8 +7,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "golang.org/x/net/context" "github.com/chrislusf/seaweedfs/weed/util" + "golang.org/x/net/context" ) func (vs *VolumeServer) GetMaster() string { diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 9294f9bf6..037fca2c2 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,10 +1,10 @@ package weed_server import ( - "net/http" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "net/http" ) type VolumeServer struct { diff --git a/weed/server/volume_server_handlers_sync.go b/weed/server/volume_server_handlers_sync.go index 38adfe870..c6e32bb9b 100644 --- a/weed/server/volume_server_handlers_sync.go +++ b/weed/server/volume_server_handlers_sync.go @@ -6,8 +6,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) getVolumeSyncStatusHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 55ef2a613..d32958339 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -9,8 +9,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/topology" - "time" "strconv" + "time" ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { @@ -55,7 +55,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { volumeId, _ := storage.NewVolumeId(vid) n.ParsePath(fid) - glog.V(2).Infoln("deleting", n) + glog.V(2).Infof("volume %s deleting %s", vid, n) cookie := n.Cookie |
