diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 11 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 60 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 7 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_tagging.go | 102 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 96 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 55 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 12 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/master_server.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 1 |
12 files changed, 234 insertions, 136 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 44098a4b5..58079032e 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -38,10 +38,12 @@ func init() { func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { var bytes []byte - if r.FormValue("pretty") != "" { - bytes, err = json.MarshalIndent(obj, "", " ") - } else { - bytes, err = json.Marshal(obj) + if obj != nil { + if r.FormValue("pretty") != "" { + bytes, err = json.MarshalIndent(obj, "", " ") + } else { + bytes, err = json.Marshal(obj) + } } if err != nil { return @@ -125,6 +127,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st ar := &operation.VolumeAssignRequest{ Count: count, DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), Replication: r.FormValue("replication"), Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 8f326f5c7..7b04e4fab 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -156,7 +156,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr resp = &filer_pb.CreateEntryResponse{} - chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry) if err2 != nil { return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } @@ -190,7 +190,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry) if err2 != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } @@ -240,7 +240,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } -func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { +func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones if existingEntry != nil { @@ -257,7 +257,14 @@ func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer garbage = append(garbage, coveredChunks...) if newEntry.Attributes != nil { - chunks, err = filer.MaybeManifestize(fs.saveAsChunk(newEntry.Attributes.Replication, newEntry.Attributes.Collection, "", "", needle.SecondsToTTL(newEntry.Attributes.TtlSec), false), chunks) + so := fs.detectStorageOption(fullpath, + newEntry.Attributes.Collection, + newEntry.Attributes.Replication, + newEntry.Attributes.TtlSec, + "", + "", + ) + chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks) if err != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) @@ -275,7 +282,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo fullpath := util.NewFullPath(req.Directory, req.EntryName) var offset int64 = 0 - entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) + entry, err := fs.filer.FindEntry(ctx, fullpath) if err == filer_pb.ErrNotFound { entry = &filer.Entry{ FullPath: fullpath, @@ -297,8 +304,8 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - - entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(entry.Replication, entry.Collection, "", "", needle.SecondsToTTL(entry.TtlSec), false), entry.Chunks) + so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "") + entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks) if err != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) @@ -323,41 +330,10 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - ttlStr := "" - if req.TtlSec > 0 { - ttlStr = strconv.Itoa(int(req.TtlSec)) - } - collection, replication, _ := fs.detectCollection(req.Path, req.Collection, req.Replication) + so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack) - var altRequest *operation.VolumeAssignRequest + assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) - dataCenter := req.DataCenter - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := req.Rack - if rack == "" { - rack = fs.option.Rack - } - - assignRequest := &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: dataCenter, - Rack: rack, - } - if dataCenter != "" || rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: uint64(req.Count), - Replication: replication, - Collection: collection, - Ttl: ttlStr, - DataCenter: "", - Rack: "", - } - } assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) @@ -374,8 +350,8 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Url: assignResult.Url, PublicUrl: assignResult.PublicUrl, Auth: string(assignResult.Auth), - Collection: collection, - Replication: replication, + Collection: so.Collection, + Replication: so.Replication, }, nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 065bb3251..b11448db4 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -89,7 +89,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher @@ -114,6 +114,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") v.SetDefault("filer.options.buckets_folder", "/buckets") fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") + // TODO deprecated, will be be removed after 2020-12-31 + // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") fs.filer.LoadConfiguration(v) @@ -131,6 +133,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.LoadBuckets() + fs.filer.LoadFilerConf() + grace.OnInterrupt(func() { fs.filer.Shutdown() }) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 555036feb..451e2a2de 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -26,11 +26,19 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) case "DELETE": stats.FilerRequestCounter.WithLabelValues("delete").Inc() - fs.DeleteHandler(w, r) + if _, ok := r.URL.Query()["tagging"]; ok { + fs.DeleteTaggingHandler(w, r) + } else { + fs.DeleteHandler(w, r) + } stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) case "PUT": stats.FilerRequestCounter.WithLabelValues("put").Inc() - fs.PostHandler(w, r) + if _, ok := r.URL.Query()["tagging"]; ok { + fs.PutTaggingHandler(w, r) + } else { + fs.PostHandler(w, r) + } stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) case "POST": stats.FilerRequestCounter.WithLabelValues("post").Inc() diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 7b08e1686..69d485e90 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -94,10 +94,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } } + // print out the header from extended properties + for k, v := range entry.Extended { + w.Header().Set(k, string(v)) + } + //set tag count if r.Method == "GET" { tagCount := 0 - for k, _ := range entry.Extended { + for k := range entry.Extended { if strings.HasPrefix(k, xhttp.AmzObjectTagging+"-") { tagCount++ } diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go new file mode 100644 index 000000000..50b3a2c06 --- /dev/null +++ b/weed/server/filer_server_handlers_tagging.go @@ -0,0 +1,102 @@ +package weed_server + +import ( + "context" + "net/http" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// add or replace one file Seaweed- prefixed attributes +// curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging +func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) { + + ctx := context.Background() + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + path = path[:len(path)-1] + } + + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + if existingEntry == nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + + if existingEntry.Extended == nil { + existingEntry.Extended = make(map[string][]byte) + } + + for header, values := range r.Header { + if strings.HasPrefix(header, needle.PairNamePrefix) { + for _, value := range values { + existingEntry.Extended[header] = []byte(value) + } + } + } + + if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil { + glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + writeJsonQuiet(w, r, http.StatusAccepted, nil) + return +} + +// remove all Seaweed- prefixed attributes +// curl -X DELETE http://localhost:8888/path/to/a/file?tagging +func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) { + + ctx := context.Background() + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + path = path[:len(path)-1] + } + + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + if existingEntry == nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + + if existingEntry.Extended == nil { + existingEntry.Extended = make(map[string][]byte) + } + + hasDeletion := false + for header, _ := range existingEntry.Extended { + if strings.HasPrefix(header, needle.PairNamePrefix) { + delete(existingEntry.Extended, header) + hasDeletion = true + } + } + + if !hasDeletion { + writeJsonQuiet(w, r, http.StatusNotModified, nil) + return + } + + if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil { + glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + + writeJsonQuiet(w, r, http.StatusAccepted, nil) + return +} diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 267b8752d..09a8e3626 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -29,30 +29,13 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() - ar := &operation.VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: ttlString, - DataCenter: dataCenter, - } - var altRequest *operation.VolumeAssignRequest - if dataCenter != "" || rack != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: ttlString, - DataCenter: "", - Rack: "", - } - } + ar, altRequest := so.ToAssignRequests(1) assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { @@ -62,7 +45,7 @@ func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ra } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid - if fsync { + if so.Fsync { urlLocation += "?fsync=true" } auth = assignResult.Auth @@ -74,25 +57,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) - dataCenter := query.Get("dataCenter") - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } - rack := query.Get("rack") - if dataCenter == "" { - rack = fs.option.Rack - } - ttlString := r.URL.Query().Get("ttl") - - // read ttl in seconds - ttl, err := needle.ReadTTL(ttlString) - ttlSeconds := int32(0) - if err == nil { - ttlSeconds = int32(ttl.Minutes()) * 60 - } + so := fs.detectStorageOption0(r.RequestURI, + query.Get("collection"), + query.Get("replication"), + query.Get("ttl"), + query.Get("dataCenter"), + query.Get("rack"), + ) - fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync) + fs.autoChunk(ctx, w, r, so) } @@ -130,21 +103,12 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) { - // default - collection = fs.option.Collection - replication = fs.option.DefaultReplication - - // get default collection settings - if qCollection != "" { - collection = qCollection - } - if qReplication != "" { - replication = qReplication - } +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption { + collection := util.Nvl(qCollection, fs.option.Collection) + replication := util.Nvl(qReplication, fs.option.DefaultReplication) // required by buckets folder - bucketDefaultReplication := "" + bucketDefaultReplication, fsync := "", false if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] t := strings.Index(bucketAndObjectKey, "/") @@ -160,5 +124,33 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st replication = bucketDefaultReplication } - return + rule := fs.filer.FilerConf.MatchStorageRule(requestURI) + + if ttlSeconds == 0 { + ttl, err := needle.ReadTTL(rule.GetTtl()) + if err != nil { + glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err) + } + ttlSeconds = int32(ttl.Minutes()) * 60 + } + + return &operation.StorageOption{ + Replication: util.Nvl(replication, rule.Replication), + Collection: util.Nvl(collection, rule.Collection), + DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), + Rack: util.Nvl(rack, fs.option.Rack), + TtlSeconds: ttlSeconds, + Fsync: fsync || rule.Fsync, + VolumeGrowthCount: rule.VolumeGrowthCount, + } +} + +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption { + + ttl, err := needle.ReadTTL(qTtl) + if err != nil { + glog.Errorf("fail to parse ttl %s: %v", qTtl, err) + } + + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d308dafa2..fd2db884f 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -21,10 +21,11 @@ import ( xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -50,10 +51,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { reply, err = fs.mkdir(ctx, w, r) } else { - reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, so) } } else { - reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync) + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, so) } if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) @@ -65,7 +66,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -86,46 +87,46 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite contentType = "" } - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, so) if err != nil { return nil, nil, err } - fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return } md5bytes = md5Hash.Sum(nil) - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset) return } -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" contentType := "" - fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync) + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so) if err != nil { return nil, nil, err } - fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return } md5bytes = md5Hash.Sum(nil) - filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset) return } -func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { // detect file mode modeStr := r.URL.Query().Get("mode") @@ -162,9 +163,9 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSec, + Replication: so.Replication, + Collection: so.Collection, + TtlSec: so.TtlSeconds, Mime: contentType, Md5: md5bytes, FileSize: uint64(chunkOffset), @@ -177,8 +178,18 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: chunkOffset, } + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + fs.saveAmzMetaData(r, entry) + for k, v := range r.Header { + if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) { + entry.Extended[k] = []byte(v[0]) + } + } + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr @@ -188,7 +199,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { var fileChunks []*filer_pb.FileChunk md5Hash := md5.New() @@ -200,7 +211,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) if assignErr != nil { return nil, nil, 0, assignErr } @@ -244,11 +255,11 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht return uploadResult, err } -func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, rack string, ttlString string, fsync bool) filer.SaveDataAsChunkFunctionType { +func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so) if assignErr != nil { return nil, "", "", assignErr } @@ -259,7 +270,7 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe return nil, "", "", uploadErr } - return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil + return uploadResult.ToPbFileChunk(fileId, offset), so.Collection, so.Replication, nil } } @@ -314,10 +325,6 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) { - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { entry.Extended[xhttp.AmzStorageClass] = []byte(sc) } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 720d97027..3cc0d0c41 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -16,12 +16,12 @@ import ( ) // handling single chunk POST or PUT upload -func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { +func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(so) if err != nil || fileId == "" || urlLocation == "" { - return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) + return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter) } glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) @@ -65,9 +65,9 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Mode: 0660, Uid: OS_UID, Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, + Replication: so.Replication, + Collection: so.Collection, + TtlSec: so.TtlSeconds, Mime: pu.MimeType, Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index e8fa3995d..9df88e956 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -86,8 +86,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + DataCenter: string(dn.GetDataCenter().Id()), } if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { // process delta volume ids if exists for fast volume id updates @@ -148,7 +149,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } - if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { ms.clientChansLock.RLock() for host, ch := range ms.clientChans { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index cc1c4b2ad..ccc94ebac 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -93,7 +93,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste preallocateSize: preallocateSize, clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), adminLocks: NewAdminLocks(), } ms.bounedLeaderChan = make(chan int, 16) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 84467ec1c..2f594fa2b 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -203,6 +203,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } case <-volumeTickChan: glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + vs.store.MaybeAdjustVolumeMax() if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err |
