diff options
Diffstat (limited to 'weed/server/filer_server_handlers_write_merge.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write_merge.go | 58 |
1 files changed, 56 insertions, 2 deletions
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go index 672a82672..b3ccd4cc1 100644 --- a/weed/server/filer_server_handlers_write_merge.go +++ b/weed/server/filer_server_handlers_write_merge.go @@ -1,11 +1,65 @@ package weed_server import ( + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/stats" + "io" + "math" ) +const MergeChunkMinCount int = 1000 + func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { - //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks - return inputChunks, nil + // Only merge small chunks more than half of the file + var chunkSize = fs.option.MaxMB * 1024 * 1024 + var smallChunk, sumChunk int + var minOffset int64 = math.MaxInt64 + for _, chunk := range inputChunks { + if chunk.IsChunkManifest { + continue + } + if chunk.Size < uint64(chunkSize/2) { + smallChunk++ + if chunk.Offset < minOffset { + minOffset = chunk.Offset + } + } + sumChunk++ + } + if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 { + return inputChunks, nil + } + + return fs.mergeChunks(so, inputChunks, minOffset) +} + +func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { + chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks) + _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent) + if mergeErr != nil { + return nil, mergeErr + } + mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) + if mergeErr != nil { + return + } + + stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc() + for _, chunk := range inputChunks { + if chunk.Offset < chunkOffset || chunk.IsChunkManifest { + mergedChunks = append(mergedChunks, chunk) + } + } + + garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks) + if err != nil { + glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", + mergedChunks, inputChunks) + return + } + fs.filer.DeleteChunksNotRecursive(garbage) + return } |
