aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go5
-rw-r--r--weed/server/filer_grpc_server.go26
-rw-r--r--weed/server/filer_grpc_server_remote.go8
-rw-r--r--weed/server/filer_server_handlers_proxy.go6
-rw-r--r--weed/server/filer_server_handlers_read.go12
-rw-r--r--weed/server/filer_server_handlers_read_dir.go5
-rw-r--r--weed/server/filer_server_handlers_tagging.go5
-rw-r--r--weed/server/filer_server_handlers_write.go14
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go22
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go6
-rw-r--r--weed/server/filer_server_handlers_write_merge.go13
-rw-r--r--weed/server/filer_server_handlers_write_upload.go21
-rw-r--r--weed/server/volume_grpc_remote.go2
-rw-r--r--weed/server/volume_server_handlers_write.go3
-rw-r--r--weed/server/webdav_server.go2
15 files changed, 77 insertions, 73 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 516f8bf1c..3aeee7752 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -129,6 +129,7 @@ func debug(params ...interface{}) {
}
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
+ ctx := r.Context()
m := make(map[string]interface{})
if r.Method != http.MethodPost {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -163,7 +164,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
Ttl: r.FormValue("ttl"),
DiskType: r.FormValue("disk"),
}
- assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar)
+ assignResult, ae := operation.Assign(ctx, masterFn, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
@@ -189,7 +190,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- uploadResult, err := uploader.UploadData(pu.Data, uploadOption)
+ uploadResult, err := uploader.UploadData(ctx, pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index b1440c94f..7fef61451 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -121,7 +121,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
-func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
+func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
return nil, err
@@ -142,12 +142,12 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
resp = &filer_pb.CreateEntryResponse{}
- chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
+ chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
if err2 != nil {
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
- so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
+ so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
if err != nil {
return nil, err
}
@@ -177,7 +177,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
- chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry)
+ chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry)
if err2 != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
}
@@ -201,11 +201,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err
}
-func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
+func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
// remove old chunks if not included in the new ones
if existingEntry != nil {
- garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
+ garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks())
if err != nil {
return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err)
}
@@ -214,11 +214,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
// files with manifest chunks are usually large and append only, skip calculating covered chunks
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks())
- chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
+ chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks)
garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil {
- so, _ := fs.detectStorageOption(fullpath,
+ so, _ := fs.detectStorageOption(ctx, fullpath,
"",
"",
newEntry.Attributes.TtlSec,
@@ -227,7 +227,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
"",
"",
) // ignore readonly error for capacity needed to manifestize
- chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
+ chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks)
if err != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", err)
@@ -271,12 +271,12 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.GetChunks(), req.Chunks...)
- so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "")
+ so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil {
glog.Warningf("detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err
}
- entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks())
+ entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks())
if err != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", err)
@@ -305,7 +305,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
req.DiskType = fs.option.DiskType
}
- so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
+ so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
@@ -313,7 +313,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
- assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
+ assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 991fff425..081d49ba0 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -64,7 +64,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
}
// detect storage option
- so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
+ so, err := fs.detectStorageOption(ctx, req.Directory, "", "", 0, "", "", "", "")
if err != nil {
return resp, err
}
@@ -97,7 +97,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
}
// assign one volume server
- assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
+ assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
fetchAndWriteErr = err
return
@@ -184,10 +184,10 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req
// this skips meta data log events
if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
- fs.filer.DeleteUncommittedChunks(chunks)
+ fs.filer.DeleteUncommittedChunks(ctx, chunks)
return nil, err
}
- fs.filer.DeleteChunks(entry.FullPath, garbage)
+ fs.filer.DeleteChunks(ctx, entry.FullPath, garbage)
fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index ca445ef9a..fd22ccd7f 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,6 +3,7 @@ package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
@@ -31,8 +32,8 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit
}
func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
-
- urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
+ ctx := r.Context()
+ urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
if err != nil {
glog.Errorf("locate %s: %v", fileId, err)
w.WriteHeader(http.StatusInternalServerError)
@@ -53,6 +54,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+ util.ReqWithRequestId(proxyReq, ctx)
for header, values := range r.Header {
for _, value := range values {
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 12371a8f6..fda767c2e 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -2,7 +2,6 @@ package weed_server
import (
"bytes"
- "context"
"encoding/base64"
"encoding/hex"
"errors"
@@ -89,14 +88,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent
}
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
-
+ ctx := r.Context()
path := r.URL.Path
isForDirectory := strings.HasSuffix(path, "/")
if isForDirectory && len(path) > 1 {
path = path[:len(path)-1]
}
- entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path))
+ entry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
if err != nil {
if path == "/" {
fs.listDirectoryHandler(w, r)
@@ -147,6 +146,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if query.Get("metadata") == "true" {
if query.Get("resolveManifest") == "true" {
if entry.Chunks, _, err = filer.ResolveChunkManifest(
+ ctx,
fs.filer.MasterClient.GetLookupFileIdFunction(),
entry.GetChunks(), 0, math.MaxInt64); err != nil {
err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error())
@@ -242,7 +242,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if shouldResize {
data := mem.Allocate(int(totalSize))
defer mem.Free(data)
- err := filer.ReadAll(data, fs.filer.MasterClient, entry.GetChunks())
+ err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks())
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusInternalServerError)
@@ -268,7 +268,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
chunks := entry.GetChunks()
if entry.IsInRemoteOnly() {
dir, name := entry.FullPath.DirAndName()
- if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
+ if resp, err := fs.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: dir,
Name: name,
}); err != nil {
@@ -280,7 +280,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
+ streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 56f0f9cb4..1961a2f83 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"errors"
"net/http"
"strconv"
@@ -18,7 +17,7 @@ import (
// sub directories are listed on the first page, when "lastFileName"
// is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
-
+ ctx := r.Context()
if fs.option.ExposeDirectoryData == false {
writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled"))
return
@@ -40,7 +39,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
namePattern := r.FormValue("namePattern")
namePatternExclude := r.FormValue("namePatternExclude")
- entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
+ entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
index 80ea09d53..5f554de1d 100644
--- a/weed/server/filer_server_handlers_tagging.go
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"net/http"
"strings"
@@ -14,7 +13,7 @@ import (
// curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging
func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) {
- ctx := context.Background()
+ ctx := r.Context()
path := r.URL.Path
if strings.HasSuffix(path, "/") {
@@ -57,7 +56,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
// curl -X DELETE http://localhost:8888/path/to/a/file?tagging
func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) {
- ctx := context.Background()
+ ctx := r.Context()
path := r.URL.Path
if strings.HasSuffix(path, "/") {
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 82880c2ac..b71c8fefd 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -34,7 +34,7 @@ type FilerPostResult struct {
Error string `json:"error,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc()
start := time.Now()
@@ -44,7 +44,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
ar, altRequest := so.ToAssignRequests(1)
- assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
+ assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
err = ae
@@ -70,7 +70,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
- ctx := context.Background()
+ ctx := r.Context()
destination := r.RequestURI
if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" {
@@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
}
query := r.URL.Query()
- so, err := fs.detectStorageOption0(destination,
+ so, err := fs.detectStorageOption0(ctx, destination,
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
@@ -240,7 +240,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
+func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
@@ -280,14 +280,14 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}, nil
}
-func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
+func (fs *FilerServer) detectStorageOption0(ctx context.Context, requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
+ so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
if so != nil {
if fsync == "false" {
so.Fsync = false
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index b0af7be4b..b16374bea 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -107,12 +107,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
md5bytes = md5Hash.Sum(nil)
headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
}
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
}
return
@@ -130,7 +130,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return nil, nil, err
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
@@ -139,12 +139,12 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
md5bytes = md5Hash.Sum(nil)
headerMd5 := r.Header.Get("Content-Md5")
if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.")
}
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
if replyerr != nil {
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
}
return
@@ -299,14 +299,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
// maybe concatenate small chunks into one whole chunk
- mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
+ mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks)
if replyerr != nil {
glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
mergedChunks = newChunks
}
// maybe compact entry chunks
- mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks)
if replyerr != nil {
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
return
@@ -348,7 +348,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return filerResult, replyerr
}
-func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
+func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) {
var fileId string
@@ -356,7 +356,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
err := util.Retry("saveAsChunk", func() error {
// assign one file id for one chunk
- assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
+ assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
if assignErr != nil {
return assignErr
}
@@ -380,7 +380,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
}
var uploadErr error
- uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
+ uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption)
if uploadErr != nil {
return uploadErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 9c1628749..fb052b5fa 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -19,7 +19,7 @@ import (
// handling single chunk POST or PUT upload
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) {
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(so)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
@@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
}
- uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
+ uploadResult, uploadError := uploader.UploadData(ctx, uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
@@ -97,7 +97,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil {
- fs.filer.DeleteUncommittedChunks(entry.GetChunks())
+ fs.filer.DeleteUncommittedChunks(ctx, entry.GetChunks())
err = dbErr
filerResult.Error = dbErr.Error()
return
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go
index 2110f485a..c22aa14c2 100644
--- a/weed/server/filer_server_handlers_write_merge.go
+++ b/weed/server/filer_server_handlers_write_merge.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -12,7 +13,7 @@ import (
const MergeChunkMinCount int = 1000
-func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
+func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
// Only merge small chunks more than half of the file
var chunkSize = fs.option.MaxMB * 1024 * 1024
var smallChunk, sumChunk int
@@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks
return inputChunks, nil
}
- return fs.mergeChunks(so, inputChunks, minOffset)
+ return fs.mergeChunks(ctx, so, inputChunks, minOffset)
}
-func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
- chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
+func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
+ chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks)
_, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
if mergeErr != nil {
return nil, mergeErr
}
- mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
+ mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
if mergeErr != nil {
return
}
@@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f
}
}
- garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
+ garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
if err != nil {
glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
mergedChunks, inputChunks)
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index e34fe27e6..495fae05a 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -2,6 +2,7 @@ package weed_server
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"hash"
@@ -27,7 +28,7 @@ var bufPool = sync.Pool{
},
}
-func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+func (fs *FilerServer) uploadRequestToChunks(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
query := r.URL.Query()
isAppend := isAppend(r)
@@ -45,10 +46,10 @@ func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Requ
chunkOffset = offsetInt
}
- return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
+ return fs.uploadReaderToChunks(ctx, reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so)
}
-func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
md5Hash = md5.New()
chunkOffset = startOffset
@@ -117,7 +118,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
wg.Done()
}()
- chunks, toChunkErr := fs.dataToChunk(fileName, contentType, buf.Bytes(), offset, so)
+ chunks, toChunkErr := fs.dataToChunk(ctx, fileName, contentType, buf.Bytes(), offset, so)
if toChunkErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
@@ -152,7 +153,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
for _, chunk := range fileChunks {
glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- fs.filer.DeleteUncommittedChunks(fileChunks)
+ fs.filer.DeleteUncommittedChunks(ctx, fileChunks)
return nil, md5Hash, 0, uploadErr, nil
}
slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) int {
@@ -161,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64,
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
-func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
+func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc()
start := time.Now()
@@ -184,14 +185,14 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
return nil, err, []byte{}
}
- uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
+ uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
}
return uploadResult, err, data
}
-func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
+func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@@ -203,14 +204,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
err := util.Retry("filerDataToChunk", func() error {
// assign one file id for one chunk
- fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
+ fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so)
if uploadErr != nil {
glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc()
return uploadErr
}
// upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
+ uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 4452e019b..0b5fa1cfc 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -77,7 +77,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return
}
- if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
+ if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil {
err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
}
}(replica.Url)
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 7f0fcc871..30f335f5d 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -16,6 +16,7 @@ import (
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e)
@@ -45,7 +46,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret := operation.UploadResult{}
- isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
+ isUnchanged, writeError := topology.ReplicatedWrite(ctx, vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5)
if writeError != nil {
writeJsonError(w, r, http.StatusInternalServerError, writeError)
return
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 2b7493ee6..47fa055e7 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -556,7 +556,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.visibleIntervals == nil {
- f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
+ f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(context.Background(), filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
f.reader = nil
}
if f.reader == nil {