aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
-rw-r--r--weed/server/filer_server_handlers_write.go96
1 files changed, 44 insertions, 52 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 267b8752d..09a8e3626 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -29,30 +29,13 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
- ar := &operation.VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: ttlString,
- DataCenter: dataCenter,
- }
- var altRequest *operation.VolumeAssignRequest
- if dataCenter != "" || rack != "" {
- altRequest = &operation.VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: ttlString,
- DataCenter: "",
- Rack: "",
- }
- }
+ ar, altRequest := so.ToAssignRequests(1)
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
@@ -62,7 +45,7 @@ func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ra
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
- if fsync {
+ if so.Fsync {
urlLocation += "?fsync=true"
}
auth = assignResult.Auth
@@ -74,25 +57,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
query := r.URL.Query()
- collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
- dataCenter := query.Get("dataCenter")
- if dataCenter == "" {
- dataCenter = fs.option.DataCenter
- }
- rack := query.Get("rack")
- if dataCenter == "" {
- rack = fs.option.Rack
- }
- ttlString := r.URL.Query().Get("ttl")
-
- // read ttl in seconds
- ttl, err := needle.ReadTTL(ttlString)
- ttlSeconds := int32(0)
- if err == nil {
- ttlSeconds = int32(ttl.Minutes()) * 60
- }
+ so := fs.detectStorageOption0(r.RequestURI,
+ query.Get("collection"),
+ query.Get("replication"),
+ query.Get("ttl"),
+ query.Get("dataCenter"),
+ query.Get("rack"),
+ )
- fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync)
+ fs.autoChunk(ctx, w, r, so)
}
@@ -130,21 +103,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
- // default
- collection = fs.option.Collection
- replication = fs.option.DefaultReplication
-
- // get default collection settings
- if qCollection != "" {
- collection = qCollection
- }
- if qReplication != "" {
- replication = qReplication
- }
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption {
+ collection := util.Nvl(qCollection, fs.option.Collection)
+ replication := util.Nvl(qReplication, fs.option.DefaultReplication)
// required by buckets folder
- bucketDefaultReplication := ""
+ bucketDefaultReplication, fsync := "", false
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
t := strings.Index(bucketAndObjectKey, "/")
@@ -160,5 +124,33 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st
replication = bucketDefaultReplication
}
- return
+ rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
+
+ if ttlSeconds == 0 {
+ ttl, err := needle.ReadTTL(rule.GetTtl())
+ if err != nil {
+ glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
+ }
+ ttlSeconds = int32(ttl.Minutes()) * 60
+ }
+
+ return &operation.StorageOption{
+ Replication: util.Nvl(replication, rule.Replication),
+ Collection: util.Nvl(collection, rule.Collection),
+ DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
+ Rack: util.Nvl(rack, fs.option.Rack),
+ TtlSeconds: ttlSeconds,
+ Fsync: fsync || rule.Fsync,
+ VolumeGrowthCount: rule.VolumeGrowthCount,
+ }
+}
+
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption {
+
+ ttl, err := needle.ReadTTL(qTtl)
+ if err != nil {
+ glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
+ }
+
+ return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
}