diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-11-15 16:58:48 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-11-15 16:58:48 -0800 |
| commit | 95c0de285d907cbd826ba6ce97f7c4994c16ffd5 (patch) | |
| tree | 2ef04241b575b8169bf6b1baa2dd15cda4046be7 /weed/server | |
| parent | 500bcab9535835404d29b8f68a03bf2ecf5b2991 (diff) | |
| download | seaweedfs-95c0de285d907cbd826ba6ce97f7c4994c16ffd5.tar.xz seaweedfs-95c0de285d907cbd826ba6ce97f7c4994c16ffd5.zip | |
refactoring
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 1 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 71 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 102 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 2 |
5 files changed, 65 insertions, 123 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 62ddf6e7f..58079032e 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -127,6 +127,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st ar := &operation.VolumeAssignRequest{ Count: count, DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), Replication: r.FormValue("replication"), Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index e3f4bb161..7b04e4fab 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -156,7 +156,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr resp = &filer_pb.CreateEntryResponse{} - chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry) if err2 != nil { return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } @@ -190,7 +190,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry) if err2 != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } @@ -240,7 +240,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } -func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones if existingEntry != nil { @@ -257,14 +257,13 @@ func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer garbage = append(garbage, coveredChunks...) if newEntry.Attributes != nil { - so := &filer.StorageOption{ - Replication: newEntry.Attributes.Replication, - Collection: newEntry.Attributes.Collection, - DataCenter: "", - Rack: "", - TtlSeconds: newEntry.Attributes.TtlSec, - Fsync: false, - } + so := fs.detectStorageOption(fullpath, + newEntry.Attributes.Collection, + newEntry.Attributes.Replication, + newEntry.Attributes.TtlSec, + "", + "", + ) chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks) if err != nil { // not good, but should be ok @@ -283,7 +282,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo fullpath := util.NewFullPath(req.Directory, req.EntryName) var offset int64 = 0 - entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) + entry, err := fs.filer.FindEntry(ctx, fullpath) if err == filer_pb.ErrNotFound { entry = &filer.Entry{ FullPath: fullpath, @@ -305,14 +304,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - so := &filer.StorageOption{ - Replication: entry.Replication, - Collection: entry.Collection, - DataCenter: "", - Rack: "", - TtlSeconds: entry.TtlSec, - Fsync: false, - } + so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "") entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks) if err != nil { // not good, but should be ok @@ -338,41 +330,10 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - ttlStr := "" - if req.TtlSec > 0 { - ttlStr = strconv.Itoa(int(req.TtlSec)) - } - collection, replication, _ := fs.detectCollection(req.Path, req.Collection, req.Replication) - - var altRequest *operation.VolumeAssignRequest + so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack) - dataCenter := req.DataCenter - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := req.Rack - if rack == "" { - rack = fs.option.Rack - } + assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - assignRequest := &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: dataCenter, - Rack: rack, - } - if dataCenter != "" || rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: "", - Rack: "", - } - } assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) @@ -389,8 +350,8 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Url: assignResult.Url, PublicUrl: assignResult.PublicUrl, Auth: string(assignResult.Auth), - Collection: collection, - Replication: replication, + Collection: so.Collection, + Replication: so.Replication, }, nil } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e27d9eb26..0344d84dc 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -30,31 +29,13 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(so *filer.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() - ar := &operation.VolumeAssignRequest{ - Count: 1, - Replication: so.Replication, - Collection: so.Collection, - Ttl: so.TtlString(), - DataCenter: so.DataCenter, - Rack: so.Rack, - } - var altRequest *operation.VolumeAssignRequest - if so.DataCenter != "" || so.Rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: 1, - Replication: so.Replication, - Collection: so.Collection, - Ttl: so.TtlString(), - DataCenter: "", - Rack: "", - } - } + ar, altRequest := so.ToAssignRequests(1) assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { @@ -76,32 +57,13 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) - dataCenter := query.Get("dataCenter") - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := query.Get("rack") - if dataCenter == "" { - rack = fs.option.Rack - } - ttlString := r.URL.Query().Get("ttl") - - // read ttl in seconds - ttl, err := needle.ReadTTL(ttlString) - ttlSeconds := int32(0) - if err == nil { - ttlSeconds = int32(ttl.Minutes()) * 60 - } - - so := &filer.StorageOption{ - Replication: replication, - Collection: collection, - DataCenter: dataCenter, - Rack: rack, - TtlSeconds: ttlSeconds, - Fsync: fsync, - } + so := fs.detectStorageOption0(r.RequestURI, + query.Get("collection"), + query.Get("replication"), + query.Get("ttl"), + query.Get("dataCenter"), + query.Get("rack"), + ) fs.autoChunk(ctx, w, r, so) @@ -141,21 +103,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) { - // default - collection = fs.option.Collection - replication = fs.option.DefaultReplication - - // get default collection settings - if qCollection != "" { - collection = qCollection - } - if qReplication != "" { - replication = qReplication - } +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) (*operation.StorageOption) { + collection := util.Nvl(qCollection, fs.option.Collection) + replication := util.Nvl(qReplication, fs.option.DefaultReplication) // required by buckets folder - bucketDefaultReplication := "" + bucketDefaultReplication, fsync := "", false if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] t := strings.Index(bucketAndObjectKey, "/") @@ -171,5 +124,32 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st replication = bucketDefaultReplication } - return + rule := fs.filer.FilerConf.MatchStorageRule(requestURI) + + if ttlSeconds == 0 { + ttl, err := needle.ReadTTL(rule.GetTtl()) + if err != nil { + glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err) + } + ttlSeconds = int32(ttl.Minutes())*60 + } + + return &operation.StorageOption{ + Replication: util.Nvl(replication, rule.Replication), + Collection: util.Nvl(collection, rule.Collection), + DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), + Rack: util.Nvl(rack, fs.option.Rack), + TtlSeconds: ttlSeconds, + Fsync: fsync || rule.Fsync, + } +} + +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) (*operation.StorageOption) { + + ttl, err := needle.ReadTTL(qTtl) + if err != nil { + glog.Errorf("fail to parse ttl %s: %v", qTtl, err) + } + + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 178145e69..fd2db884f 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -25,7 +25,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *filer.StorageOption) { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -66,7 +66,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *filer.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -104,7 +104,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *filer.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" contentType := "" @@ -126,7 +126,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return } -func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *filer.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { // detect file mode modeStr := r.URL.Query().Get("mode") @@ -199,7 +199,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *filer.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { var fileChunks []*filer_pb.FileChunk md5Hash := md5.New() @@ -255,7 +255,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht return uploadResult, err } -func (fs *FilerServer) saveAsChunk(so *filer.StorageOption) filer.SaveDataAsChunkFunctionType { +func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { // assign one file id for one chunk diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index fa6b0a44f..3cc0d0c41 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -16,7 +16,7 @@ import ( ) // handling single chunk POST or PUT upload -func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *filer.StorageOption) (filerResult *FilerPostResult, err error) { +func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) { fileId, urlLocation, auth, err := fs.assignNewFileInfo(so) |
