aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Tolman <mike.tolman@fidelissecurity.com>2016-08-05 15:47:46 -0600
committerMike Tolman <mike.tolman@fidelissecurity.com>2016-08-05 15:47:46 -0600
commita89a3c86d0bfa20ead98fec1d286cdc6018c3bde (patch)
tree801566226da8e0e5880b43644aae0f3124b3c8ac
parent0d331c1e3ae0d038ae972279a63d2ff9a70e25f4 (diff)
downloadseaweedfs-a89a3c86d0bfa20ead98fec1d286cdc6018c3bde.tar.xz
seaweedfs-a89a3c86d0bfa20ead98fec1d286cdc6018c3bde.zip
Revert "Add AutoChunking to the Filer API, so that you can upload really large files through the filer API."
This reverts commit 09059bfdccdeff1a588ee1326318075adb068b0f.
-rw-r--r--weed/command/filer.go3
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_write.go208
4 files changed, 0 insertions, 215 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 0bd508e0b..582d4e9c8 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -24,7 +24,6 @@ type FilerOptions struct {
dir *string
redirectOnRead *bool
disableDirListing *bool
- maxMB *int
secretKey *string
cassandra_server *string
cassandra_keyspace *string
@@ -43,7 +42,6 @@ func init() {
f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified")
f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
- f.maxMB = cmdFiler.Flag.Int("maxMB", 0, "split files larger than the limit")
f.cassandra_server = cmdFiler.Flag.String("cassandra.server", "", "host[:port] of the cassandra server")
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
@@ -84,7 +82,6 @@ func runFiler(cmd *Command, args []string) bool {
r := http.NewServeMux()
_, nfs_err := weed_server.NewFilerServer(r, *f.ip, *f.port, *f.master, *f.dir, *f.collection,
*f.defaultReplicaPlacement, *f.redirectOnRead, *f.disableDirListing,
- *f.maxMB,
*f.secretKey,
*f.cassandra_server, *f.cassandra_keyspace,
*f.redis_server, *f.redis_password, *f.redis_database,
diff --git a/weed/command/server.go b/weed/command/server.go
index 7a6677a65..1211c7137 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -86,7 +86,6 @@ func init() {
filerOptions.defaultReplicaPlacement = cmdServer.Flag.String("filer.defaultReplicaPlacement", "", "Default replication type if not specified during runtime.")
filerOptions.redirectOnRead = cmdServer.Flag.Bool("filer.redirectOnRead", false, "whether proxy or redirect to volume server during file GET request")
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
- filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 0, "split files larger than the limit")
filerOptions.cassandra_server = cmdServer.Flag.String("filer.cassandra.server", "", "host[:port] of the cassandra server")
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
@@ -170,7 +169,6 @@ func runServer(cmd *Command, args []string) bool {
_, nfs_err := weed_server.NewFilerServer(r, *serverBindIp, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
*filerOptions.defaultReplicaPlacement,
*filerOptions.redirectOnRead, *filerOptions.disableDirListing,
- *filerOptions.maxMB,
*filerOptions.secretKey,
*filerOptions.cassandra_server, *filerOptions.cassandra_keyspace,
*filerOptions.redis_server, *filerOptions.redis_password, *filerOptions.redis_database,
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index c9bc0e021..b99bbd7c9 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -28,7 +28,6 @@ type FilerServer struct {
disableDirListing bool
secret security.Secret
filer filer.Filer
- maxMB int
masterNodes *storage.MasterNodes
}
@@ -44,7 +43,6 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
defaultReplication: replication,
redirectOnRead: redirectOnRead,
disableDirListing: disableDirListing,
- maxMB: maxMB,
port: ip + ":" + strconv.Itoa(port),
}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 872d8c4b9..e2d40f532 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -20,8 +20,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
- "path"
- "strconv"
)
type FilerPostResult struct {
@@ -219,7 +217,6 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
-
query := r.URL.Query()
replication := query.Get("replication")
if replication == "" {
@@ -230,10 +227,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
collection = fs.collection
}
- if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked {
- return
- }
-
var fileId, urlLocation string
var err error
@@ -250,17 +243,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
u, _ := url.Parse(urlLocation)
-
- // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
- // because they need to provide FIDs instead of file paths...
- cm, _ := strconv.ParseBool(query.Get("cm"))
- if cm {
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
- }
glog.V(4).Infoln("post to", u)
-
request := &http.Request{
Method: r.Method,
URL: u,
@@ -336,197 +319,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
-func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool {
- if r.Method != "POST" {
- glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
- return false
- }
-
- // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
- query := r.URL.Query()
-
- parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32)
- maxMB := int32(parsedMaxMB)
- if maxMB <= 0 && fs.maxMB > 0 {
- maxMB = int32(fs.maxMB)
- }
- if maxMB <= 0 {
- glog.V(4).Infoln("AutoChunking not enabled")
- return false
- }
- glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
-
- chunkSize := 1024 * 1024 * maxMB
-
- contentLength := int64(0)
- if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
- contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
- if contentLength <= int64(chunkSize) {
- glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
- return false
- }
- }
-
- if contentLength <= 0 {
- glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
- return false
- }
-
- reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection)
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError, err)
- } else if reply != nil {
- writeJsonQuiet(w, r, http.StatusCreated, reply)
- }
- return true
-}
-
-func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) {
-
- multipartReader, multipartReaderErr := r.MultipartReader()
- if multipartReaderErr != nil {
- return nil, multipartReaderErr
- }
-
- part1, part1Err := multipartReader.NextPart()
- if part1Err != nil {
- return nil, part1Err
- }
-
- fileName := part1.FileName()
- if fileName != "" {
- fileName = path.Base(fileName)
- }
-
- chunks := (int64(contentLength) / int64(chunkSize)) + 1
- cm := operation.ChunkManifest{
- Name: fileName,
- Size: 0, // don't know yet
- Mime: "application/octet-stream",
- Chunks: make([]*operation.ChunkInfo, 0, chunks),
- }
-
- totalBytesRead := int64(0)
- tmpBufferSize := int32(1024 * 1024)
- tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
- chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
- chunkBufOffset := int32(0)
- chunkOffset := int64(0)
- writtenChunks := 0
-
- filerResult = &FilerPostResult{
- Name: fileName,
- }
-
- for totalBytesRead < contentLength {
- tmpBuffer.Reset()
- bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize))
- readFully := readErr != nil && readErr == io.EOF
- tmpBuf := tmpBuffer.Bytes()
- bytesToCopy := tmpBuf[0:int(bytesRead)]
-
- copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy)
- chunkBufOffset = chunkBufOffset + int32(bytesRead)
-
- if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
- writtenChunks = writtenChunks + 1
- fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection)
- if assignErr != nil {
- return nil, assignErr
- }
-
- // upload the chunk to the volume server
- chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10)
- uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
- if uploadErr != nil {
- return nil, uploadErr
- }
-
- // Save to chunk manifest structure
- cm.Chunks = append(cm.Chunks,
- &operation.ChunkInfo{
- Offset: chunkOffset,
- Size: int64(chunkBufOffset),
- Fid: fileId,
- },
- )
-
- // reset variables for the next chunk
- chunkBufOffset = 0
- chunkOffset = totalBytesRead + int64(bytesRead)
- }
-
- totalBytesRead = totalBytesRead + int64(bytesRead)
-
- if bytesRead == 0 || readFully {
- break
- }
-
- if readErr != nil {
- return nil, readErr
- }
- }
-
- cm.Size = totalBytesRead
- manifestBuf, marshalErr := cm.Marshal()
- if marshalErr != nil {
- return nil, marshalErr
- }
-
- manifestStr := string(manifestBuf)
- glog.V(4).Infoln("Generated chunk manifest: ", manifestStr)
-
- manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection)
- if manifestAssignmentErr != nil {
- return nil, manifestAssignmentErr
- }
- glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId)
- filerResult.Fid = manifestFileId
-
- u, _ := url.Parse(manifestUrlLocation)
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
-
- manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId)
- if manifestUploadErr != nil {
- return nil, manifestUploadErr
- }
-
- path := r.URL.Path
- // also delete the old fid unless PUT operation
- if r.Method != "PUT" {
- if oldFid, err := fs.filer.FindFile(path); err == nil {
- operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid))
- }
- }
-
- glog.V(4).Infoln("saving", path, "=>", manifestFileId)
- if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil {
- replyerr = db_err
- filerResult.Error = db_err.Error()
- operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up
- glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
- return
- }
-
- return
-}
-
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) {
- err = nil
-
- ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
- uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId))
- if uploadResult != nil {
- glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
- }
- if uploadError != nil {
- err = uploadError
- }
- return
-}
-
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {