aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go23
-rw-r--r--weed/server/filer_server.go5
-rw-r--r--weed/server/filer_server_handlers_write.go38
-rw-r--r--weed/server/webdav_server.go10
4 files changed, 56 insertions, 20 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 98951c347..12ea25144 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -54,6 +54,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+
if err != nil {
return err
}
@@ -84,6 +85,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
}); err != nil {
return err
}
+
limit--
if limit == 0 {
return nil
@@ -226,6 +228,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec))
}
+ collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest
@@ -236,16 +239,16 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest := &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: dataCenter,
}
if dataCenter != "" {
altRequest = &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: "",
}
@@ -261,11 +264,13 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
- Auth: string(assignResult.Auth),
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ Auth: string(assignResult.Auth),
+ Collection: collection,
+ Replication: replication,
}, err
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 55888a4a4..b0df851c9 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -67,7 +67,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.DirBucketsPath)
go fs.filer.KeepConnectedToMaster()
@@ -83,6 +83,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
util.LoadConfiguration("notification", false)
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.Set("filer.option.buckets_folder", "/buckets")
fs.option.DirBucketsPath = v.GetString("filer.option.buckets_folder")
fs.filer.LoadConfiguration(v)
@@ -96,6 +97,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
+ fs.filer.LoadBuckets(fs.option.DirBucketsPath)
+
maybeStartMetrics(fs, option)
return fs, nil
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 4707f1011..bb5f28663 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -80,14 +80,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
query := r.URL.Query()
- replication := query.Get("replication")
- if replication == "" {
- replication = fs.option.DefaultReplication
- }
- collection := query.Get("collection")
- if collection == "" {
- collection = fs.option.Collection
- }
+ collection, replication := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
@@ -305,3 +298,32 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
+
+func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string) {
+ // default
+ collection = fs.option.Collection
+ replication = fs.option.DefaultReplication
+
+ // get default collection settings
+ if qCollection != "" {
+ collection = qCollection
+ }
+ if qReplication != "" {
+ replication = qReplication
+ }
+
+ // required by buckets folder
+ if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
+ bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 {
+ collection = bucketAndObjectKey
+ }
+ if t > 0 {
+ collection = bucketAndObjectKey[:t]
+ }
+ replication = fs.filer.ReadBucketOption(collection)
+ }
+
+ return
+}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index d75869f30..8b0f09edc 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -367,6 +367,8 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+ dir, _ := filer2.FullPath(f.name).DirAndName()
+
var err error
ctx := context.Background()
if f.entry == nil {
@@ -382,13 +384,15 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
var fileId, host string
var auth security.EncodedJwt
+ var collection, replication string
if err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
- Replication: "000",
+ Replication: "",
Collection: f.fs.option.Collection,
+ ParentPath: dir,
}
resp, err := client.AssignVolume(ctx, request)
@@ -398,6 +402,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
}
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ collection, replication = resp.Collection, resp.Replication
return nil
}); err != nil {
@@ -425,10 +430,11 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
}
f.entry.Chunks = append(f.entry.Chunks, chunk)
- dir, _ := filer2.FullPath(f.name).DirAndName()
err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = collection
+ f.entry.Attributes.Replication = replication
request := &filer_pb.UpdateEntryRequest{
Directory: dir,