diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 26 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_remote.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_proxy.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read_dir.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_tagging.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 14 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 22 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_merge.go | 13 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 21 | ||||
| -rw-r--r-- | weed/server/volume_grpc_remote.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 3 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
15 files changed, 77 insertions, 73 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 516f8bf1c..3aeee7752 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -129,6 +129,7 @@ func debug(params ...interface{}) { } func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) { + ctx := r.Context() m := make(map[string]interface{}) if r.Method != http.MethodPost { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -163,7 +164,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope Ttl: r.FormValue("ttl"), DiskType: r.FormValue("disk"), } - assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar) + assignResult, ae := operation.Assign(ctx, masterFn, grpcDialOption, ar) if ae != nil { writeJsonError(w, r, http.StatusInternalServerError, ae) return @@ -189,7 +190,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope writeJsonError(w, r, http.StatusInternalServerError, err) return } - uploadResult, err := uploader.UploadData(pu.Data, uploadOption) + uploadResult, err := uploader.UploadData(ctx, pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index b1440c94f..7fef61451 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -121,7 +121,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } -func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) { +func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { fid, err := needle.ParseFileIdFromString(fileId) if err != nil { return nil, err @@ -142,12 +142,12 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr resp = &filer_pb.CreateEntryResponse{} - chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(ctx, util.Join(req.Directory, req.Entry.Name), nil, req.Entry) if err2 != nil { return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } - so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "") + so, err := fs.detectStorageOption(ctx, string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "") if err != nil { return nil, err } @@ -177,7 +177,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(ctx, fullpath, entry, req.Entry) if err2 != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } @@ -201,11 +201,11 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } -func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) cleanupChunks(ctx context.Context, fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones if existingEntry != nil { - garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks()) + garbage, err = filer.MinusChunks(ctx, fs.lookupFileId, existingEntry.GetChunks(), newEntry.GetChunks()) if err != nil { return newEntry.GetChunks(), nil, fmt.Errorf("MinusChunks: %v", err) } @@ -214,11 +214,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry // files with manifest chunks are usually large and append only, skip calculating covered chunks manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.GetChunks()) - chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks) + chunks, coveredChunks := filer.CompactFileChunks(ctx, fs.lookupFileId, nonManifestChunks) garbage = append(garbage, coveredChunks...) if newEntry.Attributes != nil { - so, _ := fs.detectStorageOption(fullpath, + so, _ := fs.detectStorageOption(ctx, fullpath, "", "", newEntry.Attributes.TtlSec, @@ -227,7 +227,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry "", "", ) // ignore readonly error for capacity needed to manifestize - chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks) + chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), chunks) if err != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) @@ -271,12 +271,12 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.GetChunks(), req.Chunks...) - so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "") + so, err := fs.detectStorageOption(ctx, string(fullpath), "", "", entry.TtlSec, "", "", "", "") if err != nil { glog.Warningf("detectStorageOption: %v", err) return &filer_pb.AppendToEntryResponse{}, err } - entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.GetChunks()) + entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), entry.GetChunks()) if err != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) @@ -305,7 +305,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol req.DiskType = fs.option.DiskType } - so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode) + so, err := fs.detectStorageOption(ctx, req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil @@ -313,7 +313,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) + assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 991fff425..081d49ba0 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -64,7 +64,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req } // detect storage option - so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "") + so, err := fs.detectStorageOption(ctx, req.Directory, "", "", 0, "", "", "", "") if err != nil { return resp, err } @@ -97,7 +97,7 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req } // assign one volume server - assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) + assignResult, err := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) if err != nil { fetchAndWriteErr = err return @@ -184,10 +184,10 @@ func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req // this skips meta data log events if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil { - fs.filer.DeleteUncommittedChunks(chunks) + fs.filer.DeleteUncommittedChunks(ctx, chunks) return nil, err } - fs.filer.DeleteChunks(entry.FullPath, garbage) + fs.filer.DeleteChunks(ctx, entry.FullPath, garbage) fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil) diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index ca445ef9a..fd22ccd7f 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,6 +3,7 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" @@ -31,8 +32,8 @@ func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrit } func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) { - - urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId) + ctx := r.Context() + urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId) if err != nil { glog.Errorf("locate %s: %v", fileId, err) w.WriteHeader(http.StatusInternalServerError) @@ -53,6 +54,7 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques proxyReq.Header.Set("Host", r.Host) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + util.ReqWithRequestId(proxyReq, ctx) for header, values := range r.Header { for _, value := range values { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 12371a8f6..fda767c2e 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -2,7 +2,6 @@ package weed_server import ( "bytes" - "context" "encoding/base64" "encoding/hex" "errors" @@ -89,14 +88,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent } func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - + ctx := r.Context() path := r.URL.Path isForDirectory := strings.HasSuffix(path, "/") if isForDirectory && len(path) > 1 { path = path[:len(path)-1] } - entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path)) + entry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) if err != nil { if path == "/" { fs.listDirectoryHandler(w, r) @@ -147,6 +146,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if query.Get("metadata") == "true" { if query.Get("resolveManifest") == "true" { if entry.Chunks, _, err = filer.ResolveChunkManifest( + ctx, fs.filer.MasterClient.GetLookupFileIdFunction(), entry.GetChunks(), 0, math.MaxInt64); err != nil { err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error()) @@ -242,7 +242,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if shouldResize { data := mem.Allocate(int(totalSize)) defer mem.Free(data) - err := filer.ReadAll(data, fs.filer.MasterClient, entry.GetChunks()) + err := filer.ReadAll(ctx, data, fs.filer.MasterClient, entry.GetChunks()) if err != nil { glog.Errorf("failed to read %s: %v", path, err) w.WriteHeader(http.StatusInternalServerError) @@ -268,7 +268,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) chunks := entry.GetChunks() if entry.IsInRemoteOnly() { dir, name := entry.FullPath.DirAndName() - if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ + if resp, err := fs.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: dir, Name: name, }); err != nil { @@ -280,7 +280,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) if err != nil { stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to prepare stream content %s: %v", r.URL, err) diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 56f0f9cb4..1961a2f83 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "errors" "net/http" "strconv" @@ -18,7 +17,7 @@ import ( // sub directories are listed on the first page, when "lastFileName" // is empty. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { - + ctx := r.Context() if fs.option.ExposeDirectoryData == false { writeJsonError(w, r, http.StatusForbidden, errors.New("ui is disabled")) return @@ -40,7 +39,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque namePattern := r.FormValue("namePattern") namePatternExclude := r.FormValue("namePatternExclude") - entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude) + entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(ctx, util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go index 80ea09d53..5f554de1d 100644 --- a/weed/server/filer_server_handlers_tagging.go +++ b/weed/server/filer_server_handlers_tagging.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "net/http" "strings" @@ -14,7 +13,7 @@ import ( // curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + ctx := r.Context() path := r.URL.Path if strings.HasSuffix(path, "/") { @@ -57,7 +56,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) // curl -X DELETE http://localhost:8888/path/to/a/file?tagging func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() + ctx := r.Context() path := r.URL.Path if strings.HasSuffix(path, "/") { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 82880c2ac..b71c8fefd 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -34,7 +34,7 @@ type FilerPostResult struct { Error string `json:"error,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(ctx context.Context, so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssign).Inc() start := time.Now() @@ -44,7 +44,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) + assignResult, ae := operation.Assign(ctx, fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) err = ae @@ -70,7 +70,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) { - ctx := context.Background() + ctx := r.Context() destination := r.RequestURI if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" { @@ -78,7 +78,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte } query := r.URL.Query() - so, err := fs.detectStorageOption0(destination, + so, err := fs.detectStorageOption0(ctx, destination, query.Get("collection"), query.Get("replication"), query.Get("ttl"), @@ -240,7 +240,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) { +func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) { rule := fs.filer.FilerConf.MatchStorageRule(requestURI) @@ -280,14 +280,14 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication }, nil } -func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) { +func (fs *FilerServer) detectStorageOption0(ctx context.Context, requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode, saveInside string) (*operation.StorageOption, error) { ttl, err := needle.ReadTTL(qTtl) if err != nil { glog.Errorf("fail to parse ttl %s: %v", qTtl, err) } - so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode) + so, err := fs.detectStorageOption(ctx, requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode) if so != nil { if fsync == "false" { so.Fsync = false diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index b0af7be4b..b16374bea 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -99,7 +99,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, part1, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -107,12 +107,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite md5bytes = md5Hash.Sum(nil) headerMd5 := r.Header.Get("Content-Md5") if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.") } filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) if replyerr != nil { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) } return @@ -130,7 +130,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return nil, nil, err } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err @@ -139,12 +139,12 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter md5bytes = md5Hash.Sum(nil) headerMd5 := r.Header.Get("Content-Md5") if headerMd5 != "" && !(util.Base64Encode(md5bytes) == headerMd5 || fmt.Sprintf("%x", md5bytes) == headerMd5) { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) return nil, nil, errors.New("The Content-Md5 you specified did not match what we received.") } filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) if replyerr != nil { - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) } return @@ -299,14 +299,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } // maybe concatenate small chunks into one whole chunk - mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks) + mergedChunks, replyerr = fs.maybeMergeChunks(ctx, so, newChunks) if replyerr != nil { glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr) mergedChunks = newChunks } // maybe compact entry chunks - mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks) + mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(ctx, so), mergedChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return @@ -348,7 +348,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { +func (fs *FilerServer) saveAsChunk(ctx context.Context, so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64, tsNs int64) (*filer_pb.FileChunk, error) { var fileId string @@ -356,7 +356,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs err := util.Retry("saveAsChunk", func() error { // assign one file id for one chunk - assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) + assignedFileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so) if assignErr != nil { return assignErr } @@ -380,7 +380,7 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs } var uploadErr error - uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption) + uploadResult, uploadErr, _ = uploader.Upload(ctx, reader, uploadOption) if uploadErr != nil { return uploadErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 9c1628749..fb052b5fa 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -19,7 +19,7 @@ import ( // handling single chunk POST or PUT upload func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(so) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter) @@ -59,7 +59,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr) } - uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption) + uploadResult, uploadError := uploader.UploadData(ctx, uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } @@ -97,7 +97,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, so.MaxFileNameLength); dbErr != nil { - fs.filer.DeleteUncommittedChunks(entry.GetChunks()) + fs.filer.DeleteUncommittedChunks(ctx, entry.GetChunks()) err = dbErr filerResult.Error = dbErr.Error() return diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go index 2110f485a..c22aa14c2 100644 --- a/weed/server/filer_server_handlers_write_merge.go +++ b/weed/server/filer_server_handlers_write_merge.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -12,7 +13,7 @@ import ( const MergeChunkMinCount int = 1000 -func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { // Only merge small chunks more than half of the file var chunkSize = fs.option.MaxMB * 1024 * 1024 var smallChunk, sumChunk int @@ -33,16 +34,16 @@ func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks return inputChunks, nil } - return fs.mergeChunks(so, inputChunks, minOffset) + return fs.mergeChunks(ctx, so, inputChunks, minOffset) } -func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { - chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks) +func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) { + chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks) _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent) if mergeErr != nil { return nil, mergeErr } - mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) + mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so) if mergeErr != nil { return } @@ -54,7 +55,7 @@ func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*f } } - garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks) + garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks) if err != nil { glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", mergedChunks, inputChunks) diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index e34fe27e6..495fae05a 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -2,6 +2,7 @@ package weed_server import ( "bytes" + "context" "crypto/md5" "fmt" "hash" @@ -27,7 +28,7 @@ var bufPool = sync.Pool{ }, } -func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { +func (fs *FilerServer) uploadRequestToChunks(ctx context.Context, w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { query := r.URL.Query() isAppend := isAppend(r) @@ -45,10 +46,10 @@ func (fs *FilerServer) uploadRequestToChunks(w http.ResponseWriter, r *http.Requ chunkOffset = offsetInt } - return fs.uploadReaderToChunks(reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so) + return fs.uploadReaderToChunks(ctx, reader, chunkOffset, chunkSize, fileName, contentType, isAppend, so) } -func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { +func (fs *FilerServer) uploadReaderToChunks(ctx context.Context, reader io.Reader, startOffset int64, chunkSize int32, fileName, contentType string, isAppend bool, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { md5Hash = md5.New() chunkOffset = startOffset @@ -117,7 +118,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, wg.Done() }() - chunks, toChunkErr := fs.dataToChunk(fileName, contentType, buf.Bytes(), offset, so) + chunks, toChunkErr := fs.dataToChunk(ctx, fileName, contentType, buf.Bytes(), offset, so) if toChunkErr != nil { uploadErrLock.Lock() if uploadErr == nil { @@ -152,7 +153,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, for _, chunk := range fileChunks { glog.V(4).Infof("purging failed uploaded %s chunk %s [%d,%d)", fileName, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - fs.filer.DeleteUncommittedChunks(fileChunks) + fs.filer.DeleteUncommittedChunks(ctx, fileChunks) return nil, md5Hash, 0, uploadErr, nil } slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) int { @@ -161,7 +162,7 @@ func (fs *FilerServer) uploadReaderToChunks(reader io.Reader, startOffset int64, return fileChunks, md5Hash, chunkOffset, nil, smallContent } -func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { +func (fs *FilerServer) doUpload(ctx context.Context, urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUpload).Inc() start := time.Now() @@ -184,14 +185,14 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil return nil, err, []byte{} } - uploadResult, err, data := uploader.Upload(limitedReader, uploadOption) + uploadResult, err, data := uploader.Upload(ctx, limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) } return uploadResult, err, data } -func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { +func (fs *FilerServer) dataToChunk(ctx context.Context, fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) { dataReader := util.NewBytesReader(data) // retry to assign a different file id @@ -203,14 +204,14 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch err := util.Retry("filerDataToChunk", func() error { // assign one file id for one chunk - fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so) + fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(ctx, so) if uploadErr != nil { glog.V(4).Infof("retry later due to assign error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkAssignRetry).Inc() return uploadErr } // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) + uploadResult, uploadErr, _ = fs.doUpload(ctx, urlLocation, dataReader, fileName, contentType, nil, auth) if uploadErr != nil { glog.V(4).Infof("retry later due to upload error: %v", uploadErr) stats.FilerHandlerCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc() diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 4452e019b..0b5fa1cfc 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -77,7 +77,7 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser return } - if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil { + if _, replicaWriteErr := uploader.UploadData(ctx, data, uploadOption); replicaWriteErr != nil && err == nil { err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr) } }(replica.Url) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 7f0fcc871..30f335f5d 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -16,6 +16,7 @@ import ( ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() if e := r.ParseForm(); e != nil { glog.V(0).Infoln("form parse error:", e) writeJsonError(w, r, http.StatusBadRequest, e) @@ -45,7 +46,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } ret := operation.UploadResult{} - isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5) + isUnchanged, writeError := topology.ReplicatedWrite(ctx, vs.GetMaster, vs.grpcDialOption, vs.store, volumeId, reqNeedle, r, contentMd5) if writeError != nil { writeJsonError(w, r, http.StatusInternalServerError, writeError) return diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 2b7493ee6..47fa055e7 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -556,7 +556,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.visibleIntervals == nil { - f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize) + f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(context.Background(), filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize) f.reader = nil } if f.reader == nil { |
