diff options
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 102 |
1 files changed, 41 insertions, 61 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index e27d9eb26..0344d84dc 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -30,31 +29,13 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(so *filer.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() - ar := &operation.VolumeAssignRequest{ - Count: 1, - Replication: so.Replication, - Collection: so.Collection, - Ttl: so.TtlString(), - DataCenter: so.DataCenter, - Rack: so.Rack, - } - var altRequest *operation.VolumeAssignRequest - if so.DataCenter != "" || so.Rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: 1, - Replication: so.Replication, - Collection: so.Collection, - Ttl: so.TtlString(), - DataCenter: "", - Rack: "", - } - } + ar, altRequest := so.ToAssignRequests(1) assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { @@ -76,32 +57,13 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) - dataCenter := query.Get("dataCenter") - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := query.Get("rack") - if dataCenter == "" { - rack = fs.option.Rack - } - ttlString := r.URL.Query().Get("ttl") - - // read ttl in seconds - ttl, err := needle.ReadTTL(ttlString) - ttlSeconds := int32(0) - if err == nil { - ttlSeconds = int32(ttl.Minutes()) * 60 - } - - so := &filer.StorageOption{ - Replication: replication, - Collection: collection, - DataCenter: dataCenter, - Rack: rack, - TtlSeconds: ttlSeconds, - Fsync: fsync, - } + so := fs.detectStorageOption0(r.RequestURI, + query.Get("collection"), + query.Get("replication"), + query.Get("ttl"), + query.Get("dataCenter"), + query.Get("rack"), + ) fs.autoChunk(ctx, w, r, so) @@ -141,21 +103,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) { - // default - collection = fs.option.Collection - replication = fs.option.DefaultReplication - - // get default collection settings - if qCollection != "" { - collection = qCollection - } - if qReplication != "" { - replication = qReplication - } +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) (*operation.StorageOption) { + collection := util.Nvl(qCollection, fs.option.Collection) + replication := util.Nvl(qReplication, fs.option.DefaultReplication) // required by buckets folder - bucketDefaultReplication := "" + bucketDefaultReplication, fsync := "", false if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] t := strings.Index(bucketAndObjectKey, "/") @@ -171,5 +124,32 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st replication = bucketDefaultReplication } - return + rule := fs.filer.FilerConf.MatchStorageRule(requestURI) + + if ttlSeconds == 0 { + ttl, err := needle.ReadTTL(rule.GetTtl()) + if err != nil { + glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err) + } + ttlSeconds = int32(ttl.Minutes())*60 + } + + return &operation.StorageOption{ + Replication: util.Nvl(replication, rule.Replication), + Collection: util.Nvl(collection, rule.Collection), + DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), + Rack: util.Nvl(rack, fs.option.Rack), + TtlSeconds: ttlSeconds, + Fsync: fsync || rule.Fsync, + } +} + +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) (*operation.StorageOption) { + + ttl, err := needle.ReadTTL(qTtl) + if err != nil { + glog.Errorf("fail to parse ttl %s: %v", qTtl, err) + } + + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack) } |
