aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go2
-rw-r--r--weed/server/filer_server.go3
-rw-r--r--weed/server/filer_server_handlers_write.go17
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go8
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go4
5 files changed, 19 insertions, 15 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 06889fbcb..ead246eb8 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -232,7 +232,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec))
}
- collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
+ collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 52dfbaf88..cf8669202 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -95,6 +95,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
v.SetDefault("filer.options.queues_folder", "/queues")
fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
fs.filer.DirQueuesPath = v.GetString("filer.options.queues_folder")
+ fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
@@ -107,7 +108,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
+ fs.filer.LoadBuckets()
util.OnInterrupt(func() {
fs.filer.Shutdown()
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index ce184875c..74a558e22 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -40,7 +40,7 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
@@ -73,6 +73,9 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
+ if fsync {
+ urlLocation += "?fsync=true"
+ }
auth = assignResult.Auth
return
}
@@ -82,7 +85,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
query := r.URL.Query()
- collection, replication := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
+ collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
@@ -96,12 +99,12 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ttlSeconds = int32(ttl.Minutes()) * 60
}
- if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString); autoChunked {
+ if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
return
}
if fs.option.Cipher {
- reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString)
+ reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -111,7 +114,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
@@ -327,7 +330,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string) {
+func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
// default
collection = fs.option.Collection
replication = fs.option.DefaultReplication
@@ -350,7 +353,7 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st
if t > 0 {
collection = bucketAndObjectKey[:t]
}
- replication = fs.filer.ReadBucketOption(collection)
+ replication, fsync = fs.filer.ReadBucketOption(collection)
}
return
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d86530bb8..4c371a9a5 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -21,7 +21,7 @@ import (
)
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string, ttlSec int32, ttlString string) bool {
+ replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false
@@ -57,7 +57,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
return false
}
- reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString)
+ reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -67,7 +67,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
}
func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string) (filerResult *FilerPostResult, replyerr error) {
+ contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
start := time.Now()
@@ -102,7 +102,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
limitedReader := io.LimitReader(partReader, int64(chunkSize))
// assign one file id for one chunk
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if assignErr != nil {
return nil, assignErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index cbcf8a05c..ebf277984 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -17,9 +17,9 @@ import (
// handling single chunk POST or PUT upload
func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string) (filerResult *FilerPostResult, err error) {
+ replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)