diff options
Diffstat (limited to 'weed/server/filer_server_handlers_write_merge.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write_merge.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go index 2110f485a..c22aa14c2 100644 --- a/weed/server/filer_server_handlers_write_merge.go +++ b/weed/server/filer_server_handlers_write_merge.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -12,7 +13,7 @@ import ( const MergeChunkMinCount int = 1000 -func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { // Only merge small chunks more than half of the file var chunkSize = fs.option.MaxMB * 1024 * 1024 var smallChunk, sumChunk int @@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks return inputChunks, nil } - return fs.mergeChunks(so, inputChunks, minOffset) + return fs.mergeChunks(ctx, so, inputChunks, minOffset) } -func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { - chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks) +func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { + chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks) _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent) if mergeErr != nil { return nil, mergeErr } - mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) + mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) if mergeErr != nil { return } @@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f } } - garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks) + garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks) if err != nil { glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", mergedChunks, inputChunks) |
