aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go5
-rw-r--r--weed/server/filer_server_handlers_write_merge.go58
-rw-r--r--weed/server/filer_server_handlers_write_upload.go8
-rw-r--r--weed/stats/metrics_names.go14
4 files changed, 74 insertions, 11 deletions
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index cce64f74d..ba0260f04 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -130,7 +130,8 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return nil, nil, err
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
+
if err != nil {
return nil, nil, err
}
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go
index 672a82672..b3ccd4cc1 100644
--- a/weed/server/filer_server_handlers_write_merge.go
+++ b/weed/server/filer_server_handlers_write_merge.go
@@ -1,11 +1,65 @@
package weed_server
import (
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "io"
+ "math"
)
+const MergeChunkMinCount int = 1000
+
func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
- //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
- return inputChunks, nil
+ // Only merge small chunks more than half of the file
+ var chunkSize = fs.option.MaxMB * 1024 * 1024
+ var smallChunk, sumChunk int
+ var minOffset int64 = math.MaxInt64
+ for _, chunk := range inputChunks {
+ if chunk.IsChunkManifest {
+ continue
+ }
+ if chunk.Size < uint64(chunkSize/2) {
+ smallChunk++
+ if chunk.Offset < minOffset {
+ minOffset = chunk.Offset
+ }
+ }
+ sumChunk++
+ }
+ if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
+ return inputChunks, nil
+ }
+
+ return fs.mergeChunks(so, inputChunks, minOffset)
+}
+
+func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
+ chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
+ _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
+ if mergeErr != nil {
+ return nil, mergeErr
+ }
+ mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
+ if mergeErr != nil {
+ return
+ }
+
+ stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
+ for _, chunk := range inputChunks {
+ if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
+ mergedChunks = append(mergedChunks, chunk)
+ }
+ }
+
+ garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
+ if err != nil {
+ glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
+ mergedChunks, inputChunks)
+ return
+ }
+ fs.filer.DeleteChunksNotRecursive(garbage)
+ return
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index d0d1575cf..a08d5efe0 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -27,7 +27,7 @@ var bufPool = sync.Pool{
},
}
-func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
query := r.URL.Query()
isAppend := isAppend(r)
@@ -45,7 +45,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset = offsetInt
}
+ return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
+}
+
+func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+
md5Hash = md5.New()
+ chunkOffset = startOffset
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
var wg sync.WaitGroup
diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go
index 15f0ad24f..13f491513 100644
--- a/weed/stats/metrics_names.go
+++ b/weed/stats/metrics_names.go
@@ -27,12 +27,14 @@ const (
Failed = "failed"
// filer handler
- DirList = "dirList"
- ContentSaveToFiler = "contentSaveToFiler"
- AutoChunk = "autoChunk"
- ChunkProxy = "chunkProxy"
- ChunkAssign = "chunkAssign"
- ChunkUpload = "chunkUpload"
+ DirList = "dirList"
+ ContentSaveToFiler = "contentSaveToFiler"
+ AutoChunk = "autoChunk"
+ ChunkProxy = "chunkProxy"
+ ChunkAssign = "chunkAssign"
+ ChunkUpload = "chunkUpload"
+ ChunkMerge = "chunkMerge"
+
ChunkDoUploadRetry = "chunkDoUploadRetry"
ChunkUploadRetry = "chunkUploadRetry"
ChunkAssignRetry = "chunkAssignRetry"