aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
-rw-r--r--weed/server/filer_server_handlers_write.go118
1 files changed, 54 insertions, 64 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 07452cd77..1a4b62235 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -22,6 +22,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
)
type FilerPostResult struct {
@@ -73,16 +74,19 @@ func makeFormData(filename, mimeType string, content io.Reader) (formData io.Rea
}
func (fs *FilerServer) queryFileInfoByPath(w http.ResponseWriter, r *http.Request, path string) (fileId, urlLocation string, err error) {
- if fileId, err = fs.filer.FindFile(path); err != nil && err != filer.ErrNotFound {
+ var found bool
+ var entry *filer2.Entry
+ if found, entry, err = fs.filer.FindEntry(filer2.FullPath(path)); err != nil {
glog.V(0).Infoln("failing to find path in filer store", path, err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
- } else if fileId != "" && err == nil {
+ } else if found {
+ fileId = string(entry.Chunks[0].Fid)
urlLocation, err = operation.LookupFileId(fs.getMasterNode(), fileId)
if err != nil {
glog.V(1).Infoln("operation LookupFileId %s failed, err is %s", fileId, err.Error())
w.WriteHeader(http.StatusNotFound)
}
- } else if fileId == "" && err == filer.ErrNotFound {
+ } else {
w.WriteHeader(http.StatusNotFound)
}
return
@@ -313,7 +317,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// also delete the old fid unless PUT operation
if r.Method != "PUT" {
- if oldFid, err := fs.filer.FindFile(path); err == nil {
+ if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); err == nil && found {
+ oldFid := string(entry.Chunks[0].Fid)
operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
} else if err != nil && err != filer.ErrNotFound {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
@@ -321,7 +326,17 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
glog.V(4).Infoln("saving", path, "=>", fileId)
- if db_err := fs.filer.CreateFile(path, fileId); db_err != nil {
+ entry := &filer2.Entry{
+ FullPath: filer2.FullPath(path),
+ Attr: filer2.Attr{
+ Mode: 0660,
+ },
+ Chunks: []filer2.FileChunk{{
+ Fid: filer2.FileId(fileId),
+ Size: uint64(r.ContentLength),
+ }},
+ }
+ if db_err := fs.filer.CreateEntry(entry); db_err != nil {
operation.DeleteFile(fs.getMasterNode(), fileId, fs.jwt(fileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
writeJsonError(w, r, http.StatusInternalServerError, db_err)
@@ -400,13 +415,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
fileName = path.Base(fileName)
}
- chunks := (int64(contentLength) / int64(chunkSize)) + 1
- cm := operation.ChunkManifest{
- Name: fileName,
- Size: 0, // don't know yet
- Mime: "application/octet-stream",
- Chunks: make([]*operation.ChunkInfo, 0, chunks),
- }
+ var fileChunks []filer2.FileChunk
totalBytesRead := int64(0)
tmpBufferSize := int32(1024 * 1024)
@@ -438,18 +447,18 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
}
// upload the chunk to the volume server
- chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10)
+ chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
if uploadErr != nil {
return nil, uploadErr
}
// Save to chunk manifest structure
- cm.Chunks = append(cm.Chunks,
- &operation.ChunkInfo{
+ fileChunks = append(fileChunks,
+ filer2.FileChunk{
+ Fid: filer2.FileId(fileId),
Offset: chunkOffset,
- Size: int64(chunkBufOffset),
- Fid: fileId,
+ Size: uint64(chunkBufOffset),
},
)
@@ -469,47 +478,30 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
}
}
- cm.Size = totalBytesRead
- manifestBuf, marshalErr := cm.Marshal()
- if marshalErr != nil {
- return nil, marshalErr
- }
-
- manifestStr := string(manifestBuf)
- glog.V(4).Infoln("Generated chunk manifest: ", manifestStr)
-
- manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection)
- if manifestAssignmentErr != nil {
- return nil, manifestAssignmentErr
- }
- glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId)
- filerResult.Fid = manifestFileId
-
- u, _ := url.Parse(manifestUrlLocation)
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
-
- manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId)
- if manifestUploadErr != nil {
- return nil, manifestUploadErr
- }
-
path := r.URL.Path
// also delete the old fid unless PUT operation
if r.Method != "PUT" {
- if oldFid, err := fs.filer.FindFile(path); err == nil {
- operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
- } else if err != nil && err != filer.ErrNotFound {
+ if found, entry, err := fs.filer.FindEntry(filer2.FullPath(path)); found && err == nil {
+ for _, chunk := range entry.Chunks {
+ oldFid := string(chunk.Fid)
+ operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
+ }
+ } else if err != nil {
glog.V(0).Infof("error %v occur when finding %s in filer store", err, path)
}
}
- glog.V(4).Infoln("saving", path, "=>", manifestFileId)
- if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil {
+ glog.V(4).Infoln("saving", path)
+ entry := &filer2.Entry{
+ FullPath: filer2.FullPath(path),
+ Attr: filer2.Attr{
+ Mode: 0660,
+ },
+ Chunks: fileChunks,
+ }
+ if db_err := fs.filer.CreateEntry(entry); db_err != nil {
replyerr = db_err
filerResult.Error = db_err.Error()
- operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
return
}
@@ -532,23 +524,21 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
}
// curl -X DELETE http://localhost:8888/path/to
-// curl -X DELETE http://localhost:8888/path/to/?recursive=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- var err error
- var fid string
- if strings.HasSuffix(r.URL.Path, "/") {
- isRecursive := r.FormValue("recursive") == "true"
- err = fs.filer.DeleteDirectory(r.URL.Path, isRecursive)
- } else {
- fid, err = fs.filer.DeleteFile(r.URL.Path)
- if err == nil && fid != "" {
- err = operation.DeleteFile(fs.getMasterNode(), fid, fs.jwt(fid))
- }
- }
- if err == nil {
- writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
- } else {
+
+ entry, err := fs.filer.DeleteEntry(filer2.FullPath(r.URL.Path))
+ if err != nil {
glog.V(4).Infoln("deleting", r.URL.Path, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ if entry != nil && !entry.IsDirectory() {
+ for _, chunk := range entry.Chunks {
+ oldFid := string(chunk.Fid)
+ operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
+ }
}
+
+ writeJsonQuiet(w, r, http.StatusAccepted, map[string]string{"error": ""})
}