aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/repeated_vacuum/repeated_vacuum.go6
-rw-r--r--weed/command/benchmark.go4
-rw-r--r--weed/command/filer_cat.go4
-rw-r--r--weed/filer/filechunk_group.go3
-rw-r--r--weed/filer/filechunk_manifest.go25
-rw-r--r--weed/filer/filechunks.go19
-rw-r--r--weed/filer/filechunks2_test.go3
-rw-r--r--weed/filer/filechunks_test.go15
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_delete_entry.go10
-rw-r--r--weed/filer/filer_deletion.go17
-rw-r--r--weed/filer/filer_notify_append.go4
-rw-r--r--weed/filer/filer_notify_read.go2
-rw-r--r--weed/filer/reader_at.go4
-rw-r--r--weed/filer/reader_cache.go5
-rw-r--r--weed/filer/stream.go41
-rw-r--r--weed/mount/weedfs.go2
-rw-r--r--weed/mount/weedfs_file_sync.go2
-rw-r--r--weed/mq/logstore/log_to_parquet.go4
-rw-r--r--weed/mq/logstore/read_log_from_disk.go2
-rw-r--r--weed/mq/logstore/read_parquet_to_log.go2
-rw-r--r--weed/operation/assign_file_id.go6
-rw-r--r--weed/operation/assign_file_id_test.go2
-rw-r--r--weed/operation/needle_parse_test.go5
-rw-r--r--weed/operation/submit.go12
-rw-r--r--weed/operation/upload_content.go29
-rw-r--r--weed/pb/grpc_client_server.go9
-rw-r--r--weed/replication/repl_util/replication_util.go5
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go2
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go2
-rw-r--r--weed/replication/sink/filersink/filer_sink.go8
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go2
-rw-r--r--weed/replication/sink/localsink/local_sink.go3
-rw-r--r--weed/replication/source/filer_source.go6
-rw-r--r--weed/server/common.go5
-rw-r--r--weed/server/filer_grpc_server.go26
-rw-r--r--weed/server/filer_grpc_server_remote.go8
-rw-r--r--weed/server/filer_server_handlers_proxy.go6
-rw-r--r--weed/server/filer_server_handlers_read.go12
-rw-r--r--weed/server/filer_server_handlers_read_dir.go5
-rw-r--r--weed/server/filer_server_handlers_tagging.go5
-rw-r--r--weed/server/filer_server_handlers_write.go14
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go22
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go6
-rw-r--r--weed/server/filer_server_handlers_write_merge.go13
-rw-r--r--weed/server/filer_server_handlers_write_upload.go21
-rw-r--r--weed/server/volume_grpc_remote.go2
-rw-r--r--weed/server/volume_server_handlers_write.go3
-rw-r--r--weed/server/webdav_server.go2
-rw-r--r--weed/shell/command_fs_merge_volumes.go2
-rw-r--r--weed/shell/command_fs_verify.go2
-rw-r--r--weed/shell/command_volume_fsck.go2
-rw-r--r--weed/topology/store_replicate.go5
-rw-r--r--weed/util/http/http_global_client_util.go20
-rw-r--r--weed/util/request_id.go9
-rw-r--r--weed/wdclient/masterclient.go9
-rw-r--r--weed/wdclient/vid_map.go5
57 files changed, 253 insertions, 218 deletions
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 1f89bd902..0a796a92f 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -1,13 +1,13 @@
package main
import (
+ "context"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"log"
"math/rand"
"time"
- "context"
"google.golang.org/grpc"
@@ -56,7 +56,7 @@ func main() {
}
func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) {
- assignResult, err := operation.Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
+ assignResult, err := operation.Assign(context.Background(), func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*master) }, grpcDialOption, &operation.VolumeAssignRequest{
Count: 1,
Replication: *replication,
})
@@ -84,7 +84,7 @@ func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, st
log.Fatalf("upload: %v", err)
}
- _, err = uploader.UploadData(data, uploadOption)
+ _, err = uploader.UploadData(context.Background(), data, uploadOption)
if err != nil {
log.Fatalf("upload: %v", err)
}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 08db2ef3d..715ae3d67 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -241,7 +241,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Replication: *b.replication,
DiskType: *b.diskType,
}
- if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
+ if assignResult, err := operation.Assign(context.Background(), b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection
if !isSecure && assignResult.Auth != "" {
isSecure = true
@@ -288,7 +288,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
- urls, err := b.masterClient.LookupFileId(fid)
+ urls, err := b.masterClient.LookupFileId(context.Background(), fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index 2c0f84ddc..136440109 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -28,9 +28,9 @@ type FilerCatOptions struct {
}
func (fco *FilerCatOptions) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
- return func(fileId string) (targetUrls []string, err error) {
+ return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := filer.VolumeId(fileId)
- resp, err := fco.filerClient.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ resp, err := fco.filerClient.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go
index 752238de9..2e61826ca 100644
--- a/weed/filer/filechunk_group.go
+++ b/weed/filer/filechunk_group.go
@@ -1,6 +1,7 @@
package filer
import (
+ "context"
"io"
"sync"
@@ -89,7 +90,7 @@ func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
continue
}
- resolvedChunks, err := ResolveOneChunkManifest(group.lookupFn, chunk)
+ resolvedChunks, err := ResolveOneChunkManifest(context.Background(), group.lookupFn, chunk)
if err != nil {
return err
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 36096d2c1..00e9a73a1 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "context"
"fmt"
"io"
"math"
@@ -48,7 +49,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
@@ -61,14 +62,14 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
continue
}
- resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
+ resolvedChunks, err := ResolveOneChunkManifest(ctx, lookupFileIdFn, chunk)
if err != nil {
return dataChunks, nil, err
}
manifestChunks = append(manifestChunks, chunk)
// recursive
- subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
+ subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(ctx, lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
return dataChunks, nil, subErr
}
@@ -78,7 +79,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
return
}
-func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveOneChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest {
return
}
@@ -87,7 +88,7 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
bytesBuffer.Reset()
defer bytesBufferPool.Put(bytesBuffer)
- err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ err := fetchWholeChunk(ctx, bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
}
@@ -102,13 +103,13 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// TODO fetch from cache for weed mount?
-func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
- urlStrings, err := lookupFileIdFn(fileId)
+func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
+ urlStrings, err := lookupFileIdFn(ctx, fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return err
}
- err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
+ err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
if err != nil {
return err
}
@@ -116,15 +117,15 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi
}
func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
- urlStrings, err := lookupFileIdFn(fileId)
+ urlStrings, err := lookupFileIdFn(context.Background(), fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err
}
- return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
+ return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
-func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
+func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
var shouldRetry bool
var totalWritten int
@@ -135,7 +136,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri
retriedCnt++
var localProcessed int
var writeErr error
- shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
if totalWritten > localProcessed {
toBeSkipped := totalWritten - localProcessed
if len(data) <= toBeSkipped {
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 7c8bb2fe1..7252169d8 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"math"
@@ -61,9 +62,9 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5Digests, nil)), len(chunks))
}
-func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+func CompactFileChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64)
compacted, garbage = SeparateGarbageChunks(visibles, chunks)
@@ -98,13 +99,13 @@ func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, st
return
}
-func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
+func MinusChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
- aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as, 0, math.MaxInt64)
+ aData, aMeta, aErr := ResolveChunkManifest(ctx, lookupFileIdFn, as, 0, math.MaxInt64)
if aErr != nil {
return nil, aErr
}
- bData, bMeta, bErr := ResolveChunkManifest(lookupFileIdFn, bs, 0, math.MaxInt64)
+ bData, bMeta, bErr := ResolveChunkManifest(ctx, lookupFileIdFn, bs, 0, math.MaxInt64)
if bErr != nil {
return nil, bErr
}
@@ -180,9 +181,9 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.ViewSize == cv.ChunkSize
}
-func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
+func ViewFromChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (chunkViews *IntervalList[*ChunkView]) {
- visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks, offset, offset+size)
+ visibles, _ := NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, chunks, offset, offset+size)
return ViewFromVisibleIntervals(visibles, offset, size)
@@ -264,9 +265,9 @@ func MergeIntoChunkViews(chunkViews *IntervalList[*ChunkView], start int64, stop
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
+func NonOverlappingVisibleIntervals(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval], err error) {
- chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
+ chunks, _, err = ResolveChunkManifest(ctx, lookupFileIdFn, chunks, startOffset, stopOffset)
if err != nil {
return
}
diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go
index dfa971f86..b735b8a27 100644
--- a/weed/filer/filechunks2_test.go
+++ b/weed/filer/filechunks2_test.go
@@ -1,6 +1,7 @@
package filer
import (
+ "context"
"github.com/stretchr/testify/assert"
"log"
"slices"
@@ -65,7 +66,7 @@ func TestCompactFileChunksRealCase(t *testing.T) {
printChunks("before", chunks)
- compacted, garbage := CompactFileChunks(nil, chunks)
+ compacted, garbage := CompactFileChunks(context.Background(), nil, chunks)
printChunks("compacted", compacted)
printChunks("garbage", garbage)
diff --git a/weed/filer/filechunks_test.go b/weed/filer/filechunks_test.go
index 7554b0080..4af2af3f6 100644
--- a/weed/filer/filechunks_test.go
+++ b/weed/filer/filechunks_test.go
@@ -1,6 +1,7 @@
package filer
import (
+ "context"
"fmt"
"log"
"math"
@@ -21,7 +22,7 @@ func TestCompactFileChunks(t *testing.T) {
{Offset: 110, Size: 200, FileId: "jkl", ModifiedTsNs: 300},
}
- compacted, garbage := CompactFileChunks(nil, chunks)
+ compacted, garbage := CompactFileChunks(context.Background(), nil, chunks)
if len(compacted) != 3 {
t.Fatalf("unexpected compacted: %d", len(compacted))
@@ -54,7 +55,7 @@ func TestCompactFileChunks2(t *testing.T) {
})
}
- compacted, garbage := CompactFileChunks(nil, chunks)
+ compacted, garbage := CompactFileChunks(context.Background(), nil, chunks)
if len(compacted) != 4 {
t.Fatalf("unexpected compacted: %d", len(compacted))
@@ -90,7 +91,7 @@ func TestRandomFileChunksCompact(t *testing.T) {
}
}
- visibles, _ := NonOverlappingVisibleIntervals(nil, chunks, 0, math.MaxInt64)
+ visibles, _ := NonOverlappingVisibleIntervals(context.Background(), nil, chunks, 0, math.MaxInt64)
for visible := visibles.Front(); visible != nil; visible = visible.Next {
v := visible.Value
@@ -228,7 +229,7 @@ func TestIntervalMerging(t *testing.T) {
for i, testcase := range testcases {
log.Printf("++++++++++ merged test case %d ++++++++++++++++++++", i)
- intervals, _ := NonOverlappingVisibleIntervals(nil, testcase.Chunks, 0, math.MaxInt64)
+ intervals, _ := NonOverlappingVisibleIntervals(context.Background(), nil, testcase.Chunks, 0, math.MaxInt64)
x := -1
for visible := intervals.Front(); visible != nil; visible = visible.Next {
x++
@@ -426,7 +427,7 @@ func TestChunksReading(t *testing.T) {
// continue
}
log.Printf("++++++++++ read test case %d ++++++++++++++++++++", i)
- chunks := ViewFromChunks(nil, testcase.Chunks, testcase.Offset, testcase.Size)
+ chunks := ViewFromChunks(context.Background(), nil, testcase.Chunks, testcase.Offset, testcase.Size)
x := -1
for c := chunks.Front(); c != nil; c = c.Next {
x++
@@ -473,7 +474,7 @@ func BenchmarkCompactFileChunks(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- CompactFileChunks(nil, chunks)
+ CompactFileChunks(context.Background(), nil, chunks)
}
}
@@ -562,7 +563,7 @@ func TestCompactFileChunks3(t *testing.T) {
{Offset: 300, Size: 100, FileId: "def", ModifiedTsNs: 200},
}
- compacted, _ := CompactFileChunks(nil, chunks)
+ compacted, _ := CompactFileChunks(context.Background(), nil, chunks)
if len(compacted) != 4 {
t.Fatalf("unexpected compacted: %d", len(compacted))
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index acde49d54..829b4c4b4 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -235,7 +235,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
- f.deleteChunksIfNotNew(oldEntry, entry)
+ f.deleteChunksIfNotNew(ctx, oldEntry, entry)
glog.V(4).Infof("CreateEntry %s: created", entry.FullPath)
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 0ae421981..17ce48143 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -36,7 +36,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
// A case not handled:
// what if the chunk is in a different collection?
if shouldDeleteChunks {
- f.maybeDeleteHardLinks(hardLinkIds)
+ f.maybeDeleteHardLinks(ctx, hardLinkIds)
}
return nil
})
@@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
}
if shouldDeleteChunks && !isDeleteCollection {
- f.DeleteChunks(p, entry.GetChunks())
+ f.DeleteChunks(ctx, p, entry.GetChunks())
}
if isDeleteCollection {
@@ -117,7 +117,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
}
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
- f.DeleteChunks(entry.FullPath, chunksToDelete)
+ f.DeleteChunks(ctx, entry.FullPath, chunksToDelete)
return nil
}
@@ -150,9 +150,9 @@ func (f *Filer) DoDeleteCollection(collectionName string) (err error) {
}
-func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) {
+func (f *Filer) maybeDeleteHardLinks(ctx context.Context, hardLinkIds []HardLinkId) {
for _, hardLinkId := range hardLinkIds {
- if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil {
+ if err := f.Store.DeleteHardLink(ctx, hardLinkId); err != nil {
glog.Errorf("delete hard link id %d : %v", hardLinkId, err)
}
}
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index 362c7c51b..cc140b3ce 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -1,6 +1,7 @@
package filer
import (
+ "context"
"strings"
"time"
@@ -72,25 +73,25 @@ func (f *Filer) loopProcessingDeletion() {
}
}
-func (f *Filer) DeleteUncommittedChunks(chunks []*filer_pb.FileChunk) {
- f.doDeleteChunks(chunks)
+func (f *Filer) DeleteUncommittedChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
+ f.doDeleteChunks(ctx, chunks)
}
-func (f *Filer) DeleteChunks(fullpath util.FullPath, chunks []*filer_pb.FileChunk) {
+func (f *Filer) DeleteChunks(ctx context.Context, fullpath util.FullPath, chunks []*filer_pb.FileChunk) {
rule := f.FilerConf.MatchStorageRule(string(fullpath))
if rule.DisableChunkDeletion {
return
}
- f.doDeleteChunks(chunks)
+ f.doDeleteChunks(ctx, chunks)
}
-func (f *Filer) doDeleteChunks(chunks []*filer_pb.FileChunk) {
+func (f *Filer) doDeleteChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
continue
}
- dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
+ dataChunks, manifestResolveErr := ResolveOneChunkManifest(ctx, f.MasterClient.LookupFileId, chunk)
if manifestResolveErr != nil {
glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
}
@@ -107,7 +108,7 @@ func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
}
}
-func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
+func (f *Filer) deleteChunksIfNotNew(ctx context.Context, oldEntry, newEntry *Entry) {
var oldChunks, newChunks []*filer_pb.FileChunk
if oldEntry != nil {
oldChunks = oldEntry.GetChunks()
@@ -116,7 +117,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
newChunks = newEntry.GetChunks()
}
- toDelete, err := MinusChunks(f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
+ toDelete, err := MinusChunks(ctx, f.MasterClient.GetLookupFileIdFunction(), oldChunks, newChunks)
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", newChunks, oldChunks)
return
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index 3c9a3496c..699a4b70e 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -58,7 +58,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
WritableVolumeCount: rule.VolumeGrowthCount,
}
- assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
+ assignResult, err := operation.Assign(context.Background(), f.GetMaster, f.GrpcDialOption, assignRequest)
if err != nil {
return nil, nil, fmt.Errorf("AssignVolume: %v", err)
}
@@ -83,7 +83,7 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
- uploadResult, err := uploader.UploadData(data, uploadOption)
+ uploadResult, err := uploader.UploadData(context.Background(), data, uploadOption)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go
index ac2c763e6..e67b283dd 100644
--- a/weed/filer/filer_notify_read.go
+++ b/weed/filer/filer_notify_read.go
@@ -323,7 +323,7 @@ type LogFileIterator struct {
func newLogFileIterator(masterClient *wdclient.MasterClient, fileEntry *Entry, startTsNs, stopTsNs int64) *LogFileIterator {
return &LogFileIterator{
- r: NewChunkStreamReaderFromFiler(masterClient, fileEntry.Chunks),
+ r: NewChunkStreamReaderFromFiler(context.Background(), masterClient, fileEntry.Chunks),
sizeBuf: make([]byte, 4),
startTsNs: startTsNs,
stopTsNs: stopTsNs,
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index b87fa0411..fecdc5174 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -29,7 +29,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex
- return func(fileId string) (targetUrls []string, err error) {
+ return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
vicCacheLock.RLock()
locations, found := vidCache[vid]
@@ -38,7 +38,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
if !found {
util.Retry("lookup volume "+vid, func() error {
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 2ef81a931..08c59a34d 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -1,6 +1,7 @@
package filer
import (
+ "context"
"fmt"
"sync"
"sync/atomic"
@@ -169,7 +170,7 @@ func (s *SingleChunkCacher) startCaching() {
s.cacheStartedCh <- struct{}{} // means this has been started
- urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
+ urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
if err != nil {
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
return
@@ -177,7 +178,7 @@ func (s *SingleChunkCacher) startCaching() {
s.data = mem.Allocate(s.chunkSize)
- _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
if s.err != nil {
mem.Free(s.data)
s.data = nil
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 2f55e3e44..bca9c7f6e 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "context"
"fmt"
"io"
"math"
@@ -71,7 +72,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
type DoStreamContent func(writer io.Writer) error
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
- return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0)
+ return PrepareStreamContentWithThrottler(context.Background(), masterClient, jwtFunc, chunks, offset, size, 0)
}
type VolumeServerJwtFunction func(fileId string) string
@@ -80,9 +81,9 @@ func noJwtFunc(string) string {
return ""
}
-func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
+func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
- chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
+ chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -91,7 +92,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
var urlStrings []string
var err error
for _, backoff := range getLookupFileIdBackoffSchedule {
- urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId)
if err == nil && len(urlStrings) > 0 {
break
}
@@ -127,7 +128,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
jwt := jwtFunc(chunkView.FileId)
- err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
+ err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
offset += int64(chunkView.ViewSize)
remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
@@ -177,25 +178,25 @@ func writeZero(w io.Writer, size int64) (err error) {
return
}
-func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
+func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
- lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
- return masterClient.LookupFileId(fileId)
+ lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrls []string, err error) {
+ return masterClient.LookupFileId(ctx, fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+ chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, int64(len(buffer)))
idx := 0
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
- urlStrings, err := lookupFileIdFn(chunkView.FileId)
+ urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
- n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
+ n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@@ -220,9 +221,9 @@ type ChunkStreamReader struct {
var _ = io.ReadSeeker(&ChunkStreamReader{})
var _ = io.ReaderAt(&ChunkStreamReader{})
-func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64)
var totalSize int64
for x := chunkViews.Front(); x != nil; x = x.Next {
@@ -238,20 +239,20 @@ func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, ch
}
}
-func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+func NewChunkStreamReaderFromFiler(ctx context.Context, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
- return masterClient.LookupFileId(fileId)
+ lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrl []string, err error) {
+ return masterClient.LookupFileId(ctx, fileId)
}
- return doNewChunkStreamReader(lookupFileIdFn, chunks)
+ return doNewChunkStreamReader(ctx, lookupFileIdFn, chunks)
}
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
lookupFileIdFn := LookupFn(filerClient)
- return doNewChunkStreamReader(lookupFileIdFn, chunks)
+ return doNewChunkStreamReader(context.Background(), lookupFileIdFn, chunks)
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
@@ -343,7 +344,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
- urlStrings, err := c.lookupFileId(chunkView.FileId)
+ urlStrings, err := c.lookupFileId(context.Background(), chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
@@ -351,7 +352,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 77ffb7e77..97e9ca30c 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -212,7 +212,7 @@ func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.St
func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
if wfs.option.VolumeServerAccess == "filerProxy" {
- return func(fileId string) (targetUrls []string, err error) {
+ return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 541270ff5..eda5ad8da 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -148,7 +148,7 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.GetChunks())
- chunks, _ := filer.CompactFileChunks(wfs.LookupFn(), nonManifestChunks)
+ chunks, _ := filer.CompactFileChunks(context.Background(), wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), chunks)
if manifestErr != nil {
// not good, but should be ok
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go
index b2ee2ae00..2a646e4ee 100644
--- a/weed/mq/logstore/log_to_parquet.go
+++ b/weed/mq/logstore/log_to_parquet.go
@@ -378,7 +378,7 @@ func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry
return err
}
-func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
+func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(ctx context.Context, fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
if len(entry.Content) > 0 {
// skip .offset files
return
@@ -392,7 +392,7 @@ func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetU
fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
return
}
- urlStrings, err = lookupFileIdFn(chunk.FileId)
+ urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil {
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
return
diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go
index 2c8a7c1de..12fca1706 100644
--- a/weed/mq/logstore/read_log_from_disk.go
+++ b/weed/mq/logstore/read_log_from_disk.go
@@ -75,7 +75,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top
glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
return
}
- urlStrings, err = lookupFileIdFn(chunk.FileId)
+ urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
if err != nil {
err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
return
diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go
index 6c69b9f12..3438af61a 100644
--- a/weed/mq/logstore/read_parquet_to_log.go
+++ b/weed/mq/logstore/read_parquet_to_log.go
@@ -55,7 +55,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
// create readerAt for the parquet file
fileSize := filer.FileSize(entry)
- visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
+ visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 13418da1a..eb54c674b 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -139,7 +139,7 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri
return
}
-func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
@@ -165,7 +165,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount,
}
- resp, grpcErr := masterClient.Assign(context.Background(), req)
+ resp, grpcErr := masterClient.Assign(ctx, req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go
index ac0f4eee6..b2ec7d92a 100644
--- a/weed/operation/assign_file_id_test.go
+++ b/weed/operation/assign_file_id_test.go
@@ -60,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ {
- Assign(func(_ context.Context) pb.ServerAddress {
+ Assign(context.Background(), func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1,
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
index 7526a6e79..339d4507e 100644
--- a/weed/operation/needle_parse_test.go
+++ b/weed/operation/needle_parse_test.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "context"
"fmt"
"io"
"net/http"
@@ -58,7 +59,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
+ uploadResult, err, data := uploader.Upload(context.Background(), bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent))
}
@@ -86,7 +87,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
+ uploader.Upload(context.Background(), bytes.NewReader(gzippedData), uploadOption)
}
/*
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 9470afced..1efa42b2f 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -62,7 +62,7 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*
Ttl: pref.Ttl,
DiskType: pref.DiskType,
}
- ret, err := Assign(masterFn, grpcDialOption, ar)
+ ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil {
for index := range files {
results[index].Error = err.Error()
@@ -155,7 +155,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
Ttl: fi.Pref.Ttl,
DiskType: fi.Pref.DiskType,
}
- ret, err = Assign(masterFn, grpcDialOption, ar)
+ ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil {
return
}
@@ -169,7 +169,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
Ttl: fi.Pref.Ttl,
DiskType: fi.Pref.DiskType,
}
- ret, err = Assign(masterFn, grpcDialOption, ar)
+ ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
@@ -223,7 +223,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
return 0, e
}
- ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
+ ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption)
if e != nil {
return 0, e
}
@@ -267,7 +267,7 @@ func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
return 0, uploaderError
}
- uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
+ uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption)
if uploadError != nil {
return 0, uploadError
}
@@ -299,6 +299,6 @@ func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt secu
return e
}
- _, e = uploader.UploadData(buf, uploadOption)
+ _, e = uploader.UploadData(context.Background(), buf, uploadOption)
return e
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 0cf6bf7cf..6d910674b 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
uploadOption.Jwt = auth
var uploadErr error
- uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
+ uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
@@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
}
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
- uploadResult, err = uploader.retriedUploadData(data, option)
+func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ uploadResult, err = uploader.retriedUploadData(ctx, data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
- uploadResult, err, data = uploader.doUpload(reader, option)
+func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = uploader.doUpload(ctx, reader, option)
return
}
-func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
@@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo
return
}
}
- uploadResult, uploadErr := uploader.retriedUploadData(data, option)
+ uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
return uploadResult, uploadErr, data
}
-func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
- uploadResult, err = uploader.doUploadData(data, option)
+ uploadResult, err = uploader.doUploadData(ctx, data, option)
if err == nil {
uploadResult.RetryCount = i
return
@@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (
return
}
-func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
@@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
}
// upload data
- uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
@@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
}
} else {
// upload data
- uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
@@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
return uploadResult, err
}
-func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
+func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
@@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er
if option.Jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
+
+ util.ReqWithRequestId(req, ctx)
+
// print("+")
resp, post_err := uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index ba0d7d0cc..f179cc3a7 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -127,8 +127,8 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
- md, _ := metadata.FromIncomingContext(ctx)
- idList := md.Get(util.RequestIDKey)
+ incomingMd, _ := metadata.FromIncomingContext(ctx)
+ idList := incomingMd.Get(util.RequestIDKey)
var reqID string
if len(idList) > 0 {
reqID = idList[0]
@@ -137,6 +137,11 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
reqID = uuid.New().String()
}
+ ctx = metadata.NewOutgoingContext(ctx,
+ metadata.New(map[string]string{
+ util.RequestIDKey: reqID,
+ }))
+
ctx = util.WithRequestID(ctx, reqID)
grpc.SetTrailer(ctx, metadata.Pairs(util.RequestIDKey, reqID))
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
index 4a77fd04a..57c206e3e 100644
--- a/weed/replication/repl_util/replication_util.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -1,6 +1,7 @@
package repl_util
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -12,7 +13,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
for x := chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
- fileUrls, err := filerSource.LookupFileId(chunk.FileId)
+ fileUrls, err := filerSource.LookupFileId(context.Background(), chunk.FileId)
if err != nil {
return err
}
@@ -21,7 +22,7 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util_http.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index fb2f9ff82..fb28355bc 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -105,7 +105,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
}
totalSize := filer.FileSize(entry)
- chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
+ chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
// Create a URL that references a to-be-created blob in your
// Azure Storage account's container.
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index 28a10b195..90f77f441 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -99,7 +99,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
}
totalSize := filer.FileSize(entry)
- chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
+ chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
bucket, err := g.client.Bucket(context.Background(), g.bucket)
if err != nil {
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 2254ca2f7..8b4b0e513 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -198,7 +198,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
glog.V(2).Infof("late updates %s", key)
} else {
// find out what changed
- deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
+ deletedChunks, newChunks, err := compareChunks(context.Background(), filer.LookupFn(fs), oldEntry, newEntry)
if err != nil {
return true, fmt.Errorf("replicate %s compare chunks error: %v", key, err)
}
@@ -242,12 +242,12 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
})
}
-func compareChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
- aData, aMeta, aErr := filer.ResolveChunkManifest(lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64)
+func compareChunks(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) {
+ aData, aMeta, aErr := filer.ResolveChunkManifest(ctx, lookupFileIdFn, oldEntry.GetChunks(), 0, math.MaxInt64)
if aErr != nil {
return nil, nil, aErr
}
- bData, bMeta, bErr := filer.ResolveChunkManifest(lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64)
+ bData, bMeta, bErr := filer.ResolveChunkManifest(ctx, lookupFileIdFn, newEntry.GetChunks(), 0, math.MaxInt64)
if bErr != nil {
return nil, nil, bErr
}
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index db6ea4aec..6fe78b21b 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -97,7 +97,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
}
totalSize := filer.FileSize(entry)
- chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
+ chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
defer wc.Close()
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index c6dddb80a..2e962d1d0 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -1,6 +1,7 @@
package localsink
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -75,7 +76,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
glog.V(4).Infof("Create Entry key: %s", key)
totalSize := filer.FileSize(entry)
- chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
+ chunkViews := filer.ViewFromChunks(context.Background(), localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
dir := filepath.Dir(key)
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 768e251a4..8a63d0c8f 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -55,7 +55,7 @@ func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, rea
return nil
}
-func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
+func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls []string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
@@ -63,7 +63,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
@@ -110,7 +110,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
return util_http.DownloadFile("http://"+fs.address+"/?proxyChunkId="+fileId, "")
}
- fileUrls, err := fs.LookupFileId(fileId)
+ fileUrls, err := fs.LookupFileId(context.Background(), fileId)
if err != nil {
return "", nil, nil, err
}
diff --git a/weed/server/common.go b/weed/server/common.go
index 516f8bf1c..3aeee7752 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -129,6 +129,7 @@ func debug(params ...interface{}) {
}
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
+ ctx := r.Context()
m := make(map[string]interface{})
if r.Method != http.MethodPost {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -163,7 +164,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"),
}
- assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar)
+ assignResult, ae := operation.Assign(ctx, masterFn, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
@@ -189,7 +190,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- uploadResult, err := uploader.UploadData(pu.Data, uploadOption)
+ uploadResult, err := uploader.UploadData(ctx, pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index b1440c94f..7fef61451 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -121,7 +121,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
-func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
+func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
return nil, err
@@ -142,12 +142,12 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
resp = &filer_pb.CreateEntryResponse{}
- chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
+ chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
if err2 != nil {
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
- so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
+ so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
if err != nil {
return nil, err
}
@@ -177,7 +177,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
- chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry)
+ chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry)
if err2 != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
}
@@ -201,11 +201,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err
}
-func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
+func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
// remove old chunks if not included in the new ones
if existingEntry != nil {
- garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
+ garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
if err != nil {
return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err)
}
@@ -214,11 +214,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
// files with manifest chunks are usually large and append only, skip calculating covered chunks
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
- chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
+ chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks)
garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil {
- so, _ := fs.detectStorageOption(fullpath,
+ so, _ := fs.detectStorageOption(ctx, fullpath,
"",
"",
newEntry.Attributes.TtlSec,
@@ -227,7 +227,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
"",
"",
) // ignore readonly error for capacity needed to manifestize
- chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
+ chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
if err != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", err)
@@ -271,12 +271,12 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
- so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "")
+ so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil {
glog.Warningf("detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err
}
- entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks())
+ entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
if err != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", err)
@@ -305,7 +305,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
req.DiskType = fs.option.DiskType
}
- so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
+ so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
@@ -313,7 +313,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
- assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
+ assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 991fff425..081d49ba0 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -64,7 +64,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
}
// detect storage option
- so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
+ so, err := fs.detectStorageOption(ctx, req.Directory, "", "", 0, "", "", "", "")
if err != nil {
return resp, err
}
@@ -97,7 +97,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
}
// assign one volume server
- assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
+ assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
fetchAndWriteErr = err
return
@@ -184,10 +184,10 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
// this skips meta data log events
if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
- fs.filer.DeleteUncommittedChunks(chunks)
+ fs.filer.DeleteUncommittedChunks(ctx, chunks)
return nil, err
}
- fs.filer.DeleteChunks(entry.FullPath, garbage)
+ fs.filer.DeleteChunks(ctx, entry.FullPath, garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index ca445ef9a..fd22ccd7f 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,6 +3,7 @@ package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
@@ -31,8 +32,8 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit
}
func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
-
- urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
+ ctx := r.Context()
+ urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
if err != nil {
glog.Errorf("locate %s: %v", fileId, err)
w.WriteHeader(http.StatusInternalServerError)
@@ -53,6 +54,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+ util.ReqWithRequestId(proxyReq, ctx)
for header, values := range r.Header {
for _, value := range values {
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 12371a8f6..fda767c2e 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -2,7 +2,6 @@ package weed_server
import (
"bytes"
- "context"
"encoding/base64"
"encoding/hex"
"errors"
@@ -89,14 +88,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent
}
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
-
+ ctx := r.Context()
path := r.URL.Path
isForDirectory := strings.HasSuffix(path, "/")
if isForDirectory && len(path) > 1 {
path = path[:len(path)-1]
}
- entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path))
+ entry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
if err != nil {
if path == "/" {
fs.listDirectoryHandler(w, r)
@@ -147,6 +146,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if query.Get("metadata") == "true" {
if query.Get("resolveManifest") == "true" {
if entry.Chunks, _, err = filer.ResolveChunkManifest(
+ ctx,
fs.filer.MasterClient.GetLookupFileIdFunction(),
entry.GetChunks(), 0, math.MaxInt64); err != nil {
err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error())
@@ -242,7 +242,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if shouldResize {
data := mem.Allocate(int(totalSize))
defer mem.Free(data)
- err := filer.ReadAll(data, fs.filer.MasterClient, entry.GetChunks())
+ err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks())
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusInternalServerError)
@@ -268,7 +268,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
chunks := entry.GetChunks()
if entry.IsInRemoteOnly() {
dir, name := entry.FullPath.DirAndName()
- if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
+ if resp, err := fs.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: dir,
Name: name,
}); err != nil {
@@ -280,7 +280,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
+ streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 56f0f9cb4..1961a2f83 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"errors"
"net/http"
"strconv"
@@ -18,7 +17,7 @@ import (
// sub directories are listed on the first page, when "lastFileName"
// is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
-
+ ctx := r.Context()
if fs.option.ExposeDirectoryData == false {
writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled"))
return
@@ -40,7 +39,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
namePattern := r.FormValue("namePattern")
namePatternExclude := r.FormValue("namePatternExclude")
- entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
+ entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
index 80ea09d53..5f554de1d 100644
--- a/weed/server/filer_server_handlers_tagging.go
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"net/http"
"strings"
@@ -14,7 +13,7 @@ import (
// curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging
func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) {
- ctx := context.Background()
+ ctx := r.Context()
path := r.URL.Path
if strings.HasSuffix(path, "/") {
@@ -57,7 +56,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
// curl -X DELETE http://localhost:8888/path/to/a/file?tagging
func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) {
- ctx := context.Background()
+ ctx := r.Context()
path := r.URL.Path
if strings.HasSuffix(path, "/") {
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 82880c2ac..b71c8fefd 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -34,7 +34,7 @@ type FilerPostResult struct {
Error string `json:"error,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc()
start := time.Now()
@@ -44,7 +44,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
ar, altRequest := so.ToAssignRequests(1)
- assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
+ assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
err = ae
@@ -70,7 +70,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
- ctx := context.Background()
+ ctx := r.Context()
destination := r.RequestURI
if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" {
@@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
}
query := r.URL.Query()
- so, err := fs.detectStorageOption0(destination,
+ so, err := fs.detectStorageOption0(ctx, destination,
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
@@ -240,7 +240,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
+func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
@@ -280,14 +280,14 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}, nil
}
-func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
+func (fs *FilerServer) detectStorageOption0(ctx context.Context, requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
+ so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
if so != nil {
if fsync == "false" {
so.Fsync = false
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index b0af7be4b..b16374bea 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -107,12 +107,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
md5bytes = md5Hash.Sum(nil)
headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
}
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
}
return
@@ -130,7 +130,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return nil, nil, err
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
@@ -139,12 +139,12 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
md5bytes = md5Hash.Sum(nil)
headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
}
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
}
return
@@ -299,14 +299,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
// maybe concatenate small chunks into one whole chunk
- mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
+ mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks)
if replyerr != nil {
glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
mergedChunks = newChunks
}
// maybe compact entry chunks
- mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks)
if replyerr != nil {
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
return
@@ -348,7 +348,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return filerResult, replyerr
}
-func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
+func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
var fileId string
@@ -356,7 +356,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
err := util.Retry("saveAsChunk", func() error {
// assign one file id for one chunk
- assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
+ assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
if assignErr != nil {
return assignErr
}
@@ -380,7 +380,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
}
var uploadErr error
- uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
+ uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption)
if uploadErr != nil {
return uploadErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 9c1628749..fb052b5fa 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -19,7 +19,7 @@ import (
// handling single chunk POST or PUT upload
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) {
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(so)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
@@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
}
- uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
+ uploadResult, uploadError := uploader.UploadData(ctx, uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
@@ -97,7 +97,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
- fs.filer.DeleteUncommittedChunks(entry.GetChunks())
+ fs.filer.DeleteUncommittedChunks(ctx, entry.GetChunks())
err = dbErr
filerResult.Error = dbErr.Error()
return
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go
index 2110f485a..c22aa14c2 100644
--- a/weed/server/filer_server_handlers_write_merge.go
+++ b/weed/server/filer_server_handlers_write_merge.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -12,7 +13,7 @@ import (
const MergeChunkMinCount int = 1000
-func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
+func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
// Only merge small chunks more than half of the file
var chunkSize = fs.option.MaxMB * 1024 * 1024
var smallChunk, sumChunk int
@@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks
return inputChunks, nil
}
- return fs.mergeChunks(so, inputChunks, minOffset)
+ return fs.mergeChunks(ctx, so, inputChunks, minOffset)
}
-func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
- chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
+func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
+ chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks)
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
if mergeErr != nil {
return nil, mergeErr
}
- mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
+ mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
if mergeErr != nil {
return
}
@@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f
}
}
- garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
+ garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
mergedChunks, inputChunks)
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index e34fe27e6..495fae05a 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -2,6 +2,7 @@ package weed_server
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"hash"
@@ -27,7 +28,7 @@ var bufPool = sync.Pool{
},
}
-func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+func (fs *FilerServer) uploadRequestToChunks(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
query := r.URL.Query()
isAppend := isAppend(r)
@@ -45,10 +46,10 @@ func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Requ
chunkOffset = offsetInt
}
- return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
+ return fs.uploadReaderToChunks(ctx, reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
}
-func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
md5Hash = md5.New()
chunkOffset = startOffset
@@ -117,7 +118,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
wg.Done()
}()
- chunks, toChunkErr := fs.dataToChunk(fileName, contentType, buf.Bytes(), offset, so)
+ chunks, toChunkErr := fs.dataToChunk(ctx, fileName, contentType, buf.Bytes(), offset, so)
if toChunkErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
@@ -152,7 +153,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
for _, chunk := range fileChunks {
glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, md5Hash, 0, uploadErr, nil
}
slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) int {
@@ -161,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
-func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
+func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc()
start := time.Now()
@@ -184,14 +185,14 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
return nil, err, []byte{}
}
- uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
+ uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
}
return uploadResult, err, data
}
-func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
+func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@@ -203,14 +204,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
err := util.Retry("filerDataToChunk", func() error {
// assign one file id for one chunk
- fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
+ fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so)
if uploadErr != nil {
glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
return uploadErr
}
// upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
+ uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 4452e019b..0b5fa1cfc 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -77,7 +77,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return
}
- if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
+ if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil {
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
}
}(replica.Url)
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 7f0fcc871..30f335f5d 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -16,6 +16,7 @@ import (
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e)
@@ -45,7 +46,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret := operation.UploadResult{}
- isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
+ isUnchanged, writeError := topology.ReplicatedWrite(ctx, vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
if writeError != nil {
writeJsonError(w, r, http.StatusInternalServerError, writeError)
return
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 2b7493ee6..47fa055e7 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -556,7 +556,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.visibleIntervals == nil {
- f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
+ f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(context.Background(), filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.reader = nil
}
if f.reader == nil {
diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go
index f31ec10f4..e5ae54285 100644
--- a/weed/shell/command_fs_merge_volumes.go
+++ b/weed/shell/command_fs_merge_volumes.go
@@ -351,7 +351,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
jwt = security.GenJwtForVolumeServer(security.SigningKey(signingKey), expiresAfterSec, toFid.String())
}
- _, err, _ = uploader.Upload(reader, &operation.UploadOption{
+ _, err, _ = uploader.Upload(context.Background(), reader, &operation.UploadOption{
UploadUrl: uploadURL,
Filename: filename,
IsInputCompressed: isCompressed,
diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go
index dcac60874..fd46bf591 100644
--- a/weed/shell/command_fs_verify.go
+++ b/weed/shell/command_fs_verify.go
@@ -286,7 +286,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
return nil
}
}
- dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
+ dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(context.Background(), filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
if resolveErr != nil {
return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index dbd814309..8a76722d4 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -240,7 +240,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
if *c.verbose && entry.Entry.IsDirectory {
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
}
- dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
+ dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(context.Background(), filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
if resolveErr != nil {
return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index a2be991fa..10fe35f0a 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -1,6 +1,7 @@
package topology
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -23,7 +24,7 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
-func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
+func ReplicatedWrite(ctx context.Context, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(r)
@@ -121,7 +122,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
return err
}
- _, err = uploader.UploadData(n.Data, uploadOption)
+ _, err = uploader.UploadData(ctx, n.Data, uploadOption)
if err != nil {
glog.Errorf("replication-UploadData, err:%v, url:%s", err, u.String())
}
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go
index 33d978d9e..3cc819a47 100644
--- a/weed/util/http/http_global_client_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -2,6 +2,7 @@ package http
import (
"compress/gzip"
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -214,11 +215,11 @@ func NormalizeUrl(url string) (string, error) {
return GetGlobalHttpClient().NormalizeHttpScheme(url)
}
-func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
if cipherKey != nil {
var n int
- _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ _, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@@ -286,13 +287,13 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
-func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
- return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+ return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
-func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
- return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
@@ -306,6 +307,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
} else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
+ util.ReqWithRequestId(req, ctx)
r, err := GetGlobalHttpClient().Do(req)
if err != nil {
@@ -351,7 +353,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
}
-func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
@@ -447,7 +449,7 @@ func (r *CountingReader) Read(p []byte) (n int, err error) {
return n, err
}
-func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
+func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
var shouldRetry bool
@@ -457,7 +459,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString)
}
- shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
+ shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
if n < len(buffer) {
x := copy(buffer[n:], data)
n += x
diff --git a/weed/util/request_id.go b/weed/util/request_id.go
index 85ec254dc..7945e7cc4 100644
--- a/weed/util/request_id.go
+++ b/weed/util/request_id.go
@@ -1,6 +1,9 @@
package util
-import "context"
+import (
+ "context"
+ "net/http"
+)
const (
RequestIdHttpHeader = "X-Request-ID"
@@ -18,3 +21,7 @@ func GetRequestID(ctx context.Context) string {
func WithRequestID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, RequestIDKey, id)
}
+
+func ReqWithRequestId(req *http.Request, ctx context.Context) {
+ req.Header.Set(RequestIdHttpHeader, GetRequestID(ctx))
+}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index da46a440b..eccf1d14b 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -56,13 +56,14 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
return mc.LookupFileIdWithFallback
}
-func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
- fullUrls, err = mc.vidMap.LookupFileId(fileId)
+func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId)
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
+
+ err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
if err != nil {
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 7a2a5bb92..9d5e5d378 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -1,6 +1,7 @@
package wdclient
import (
+ "context"
"errors"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -21,7 +22,7 @@ type HasLookupFileIdFunction interface {
GetLookupFileIdFunction() LookupFileIdFunctionType
}
-type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
+type LookupFileIdFunctionType func(ctx context.Context, fileId string) (targetUrls []string, err error)
type Location struct {
Url string `json:"url,omitempty"`
@@ -99,7 +100,7 @@ func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err er
return
}
-func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
+func (vc *vidMap) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return nil, errors.New("Invalid fileId " + fileId)