aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write_merge.go
blob: 4207200cb567d4cd189665cf50a442ff73f08bf0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package weed_server

import (
	"context"
	"io"
	"math"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/operation"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/stats"
)

const MergeChunkMinCount int = 1000

func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
	// Only merge small chunks more than half of the file
	var chunkSize = fs.option.MaxMB * 1024 * 1024
	var smallChunk, sumChunk int
	var minOffset int64 = math.MaxInt64
	for _, chunk := range inputChunks {
		if chunk.IsChunkManifest {
			continue
		}
		if chunk.Size < uint64(chunkSize/2) {
			smallChunk++
			if chunk.Offset < minOffset {
				minOffset = chunk.Offset
			}
		}
		sumChunk++
	}
	if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
		return inputChunks, nil
	}

	return fs.mergeChunks(ctx, so, inputChunks, minOffset)
}

func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
	chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks)
	_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
	if mergeErr != nil {
		return nil, mergeErr
	}
	mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
	if mergeErr != nil {
		return
	}

	stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
	for _, chunk := range inputChunks {
		if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
			mergedChunks = append(mergedChunks, chunk)
		}
	}

	garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
	if err != nil {
		glog.ErrorfCtx(ctx, "Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
			mergedChunks, inputChunks)
		return mergedChunks, err
	}
	fs.filer.DeleteChunksNotRecursive(garbage)
	return
}