aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-20 13:40:32 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-20 13:40:32 -0800
commitf98817cfe655d0285ed35f6d30068aa15938bd3d (patch)
treef1f342966625cf738242fc50c03797d27e1ca81a
parent1b8e3da295d1a5e8a09c0a65af3003583bca33ab (diff)
downloadseaweedfs-f98817cfe655d0285ed35f6d30068aa15938bd3d.tar.xz
seaweedfs-f98817cfe655d0285ed35f6d30068aa15938bd3d.zip
filer: support appending to a file
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go87
1 files changed, 60 insertions, 27 deletions
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 2bd1104d3..237f121fe 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -121,14 +121,11 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return
}
-func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
+func isAppend(r *http.Request) bool {
+ return r.URL.Query().Get("op") == "append"
+}
- // maybe compact chunks
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
+func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
modeStr := r.URL.Query().Get("mode")
@@ -149,25 +146,61 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
- glog.V(4).Infoln("saving", path)
- entry := &filer.Entry{
- FullPath: util.FullPath(path),
- Attr: filer.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: so.Replication,
- Collection: so.Collection,
- TtlSec: so.TtlSeconds,
- Mime: contentType,
- Md5: md5bytes,
- FileSize: uint64(chunkOffset),
- },
- Chunks: fileChunks,
- Content: content,
+ var entry *filer.Entry
+ var mergedChunks []*filer_pb.FileChunk
+ // when it is an append
+ if isAppend(r) {
+ existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if findErr != nil && findErr != filer_pb.ErrNotFound {
+ glog.V(0).Infof("failing to find %s: %v", path, findErr)
+ }
+ entry = existingEntry
+ }
+ if entry != nil {
+ entry.Mtime = time.Now()
+ entry.Md5 = nil
+ // adjust chunk offsets
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ mergedChunks = append(entry.Chunks, fileChunks...)
+ entry.FileSize += uint64(chunkOffset)
+
+ // TODO
+ if len(entry.Content) > 0 {
+ replyerr = fmt.Errorf("append to small file is not supported yet")
+ return
+ }
+
+ } else {
+ glog.V(4).Infoln("saving", path)
+ mergedChunks = fileChunks
+ entry = &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: so.Replication,
+ Collection: so.Collection,
+ TtlSec: so.TtlSeconds,
+ Mime: contentType,
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
+ },
+ Content: content,
+ }
+ }
+
+ // maybe compact entry chunks
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
}
+ entry.Chunks = mergedChunks
filerResult = &FilerPostResult{
Name: fileName,
@@ -187,7 +220,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
+ fs.filer.DeleteChunks(fileChunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -211,7 +244,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
if err != nil {
return nil, nil, 0, err, nil
}
- if chunkOffset == 0 {
+ if chunkOffset == 0 && !isAppend(r) {
if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
smallContent = data
chunkOffset += int64(len(data))