aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write_merge.go
diff options
context:
space:
mode:
authorAleksey Kosov <rusyak777@list.ru>2025-05-28 21:34:02 +0300
committerGitHub <noreply@github.com>2025-05-28 11:34:02 -0700
commit283d9e0079d5deb57aefe9a7b30e8b9869ba8685 (patch)
tree87b09bebed2ee4afc9c2a4f711ac8598fe2949b7 /weed/server/filer_server_handlers_write_merge.go
parent62aaaa18f3ea8b7600d28934580dc220ca95164a (diff)
downloadseaweedfs-283d9e0079d5deb57aefe9a7b30e8b9869ba8685.tar.xz
seaweedfs-283d9e0079d5deb57aefe9a7b30e8b9869ba8685.zip
Add context with request (#6824)
Diffstat (limited to 'weed/server/filer_server_handlers_write_merge.go')
-rw-r--r--weed/server/filer_server_handlers_write_merge.go13
1 files changed, 7 insertions, 6 deletions
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go
index 2110f485a..c22aa14c2 100644
--- a/weed/server/filer_server_handlers_write_merge.go
+++ b/weed/server/filer_server_handlers_write_merge.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -12,7 +13,7 @@ import (
const MergeChunkMinCount int = 1000
-func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
+func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
// Only merge small chunks more than half of the file
var chunkSize = fs.option.MaxMB * 1024 * 1024
var smallChunk, sumChunk int
@@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks
return inputChunks, nil
}
- return fs.mergeChunks(so, inputChunks, minOffset)
+ return fs.mergeChunks(ctx, so, inputChunks, minOffset)
}
-func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
- chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
+func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
+ chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks)
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
if mergeErr != nil {
return nil, mergeErr
}
- mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
+ mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
if mergeErr != nil {
return
}
@@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f
}
}
- garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
+ garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
mergedChunks, inputChunks)