aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write_autochunk.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server_handlers_write_autochunk.go')
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go109
1 files changed, 71 insertions, 38 deletions
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index eee39152b..237f121fe 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -96,12 +96,6 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return nil, nil, err
}
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
-
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
@@ -111,25 +105,26 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := ""
- contentType := ""
+ contentType := r.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
if err != nil {
return nil, nil, err
}
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
-
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
return
}
+func isAppend(r *http.Request) bool {
+ return r.URL.Query().Get("op") == "append"
+}
+
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -151,26 +146,62 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
- glog.V(4).Infoln("saving", path)
- entry := &filer.Entry{
- FullPath: util.FullPath(path),
- Attr: filer.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: so.Replication,
- Collection: so.Collection,
- TtlSec: so.TtlSeconds,
- Mime: contentType,
- Md5: md5bytes,
- FileSize: uint64(chunkOffset),
- },
- Chunks: fileChunks,
- Content: content,
+ var entry *filer.Entry
+ var mergedChunks []*filer_pb.FileChunk
+ // when it is an append
+ if isAppend(r) {
+ existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if findErr != nil && findErr != filer_pb.ErrNotFound {
+ glog.V(0).Infof("failing to find %s: %v", path, findErr)
+ }
+ entry = existingEntry
+ }
+ if entry != nil {
+ entry.Mtime = time.Now()
+ entry.Md5 = nil
+ // adjust chunk offsets
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ mergedChunks = append(entry.Chunks, fileChunks...)
+ entry.FileSize += uint64(chunkOffset)
+
+ // TODO
+ if len(entry.Content) > 0 {
+ replyerr = fmt.Errorf("append to small file is not supported yet")
+ return
+ }
+
+ } else {
+ glog.V(4).Infoln("saving", path)
+ mergedChunks = fileChunks
+ entry = &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: so.Replication,
+ Collection: so.Collection,
+ TtlSec: so.TtlSeconds,
+ Mime: contentType,
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
+ },
+ Content: content,
+ }
}
+ // maybe compact entry chunks
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
+ entry.Chunks = mergedChunks
+
filerResult = &FilerPostResult{
Name: fileName,
Size: chunkOffset,
@@ -189,7 +220,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
+ fs.filer.DeleteChunks(fileChunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -204,7 +235,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
chunkOffset := int64(0)
- var smallContent, content []byte
+ var smallContent []byte
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
@@ -213,6 +244,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
if err != nil {
return nil, nil, 0, err, nil
}
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
+ }
+ }
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@@ -239,8 +277,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return nil, nil, 0, uploadErr, nil
}
- content = data
-
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
break
@@ -260,9 +296,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}
}
- if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 {
- smallContent = content
- }
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}