aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-21 00:47:27 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-21 00:47:27 -0800
commita9e6db1a8e964f10571a5e11c197fd2da141c6f7 (patch)
tree0a22db3d6c68178d8e913c1bffd550ae4127a7e1 /weed/server
parentf0455dee683e831487c345a575b7894c0e2bf61a (diff)
parent84f05787f8eecfcb61e49882346ad5855b6bb784 (diff)
downloadseaweedfs-origin/ftp.tar.xz
seaweedfs-origin/ftp.zip
Merge branch 'master' into ftporigin/ftp
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go9
-rw-r--r--weed/server/filer_grpc_server.go44
-rw-r--r--weed/server/filer_grpc_server_rename.go4
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go52
-rw-r--r--weed/server/filer_server.go5
-rw-r--r--weed/server/filer_server_handlers_read.go17
-rw-r--r--weed/server/filer_server_handlers_read_dir.go3
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go109
-rw-r--r--weed/server/filer_server_rocksdb.go7
-rw-r--r--weed/server/volume_server_handlers_read.go5
10 files changed, 163 insertions, 92 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 58079032e..cf9547950 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -233,12 +233,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
-func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) {
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(w, 0, totalSize); err != nil {
+ if err := writeFn(w, 0, totalSize, 0); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -277,9 +277,8 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
ra := ranges[0]
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
- err = writeFn(w, ra.start, ra.length)
+ err = writeFn(w, ra.start, ra.length, http.StatusPartialContent)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -307,7 +306,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
pw.CloseWithError(e)
return
}
- if e = writeFn(part, ra.start, ra.length); e != nil {
+ if e = writeFn(part, ra.start, ra.length, 0); e != nil {
pw.CloseWithError(e)
return
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5f1b2d819..b0563d8bd 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -44,7 +44,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
}, nil
}
-func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
glog.V(4).Infof("ListEntries %v", req)
@@ -60,23 +60,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
+ var listErr error
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
-
- if err != nil {
- return err
- }
- if len(entries) == 0 {
- return nil
- }
-
- includeLastFile = false
-
- for _, entry := range entries {
-
- lastFileName = entry.Name()
-
- if err := stream.Send(&filer_pb.ListEntriesResponse{
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
Name: entry.Name(),
IsDirectory: entry.IsDirectory(),
@@ -88,19 +77,28 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
Content: entry.Content,
},
}); err != nil {
- return err
+ return false
}
limit--
if limit == 0 {
- return nil
+ return false
}
- }
+ return true
+ })
- if len(entries) < paginationLimit {
- break
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
+ }
+ if !hasEntries {
+ return nil
}
+ includeLastFile = false
+
}
return nil
@@ -326,7 +324,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
- if err != nil {
+ if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
}
return resp, nil
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index fa86737ac..5b68b64de 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -75,7 +75,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "")
+ entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "")
if err != nil {
return err
}
@@ -90,7 +90,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return err
}
}
- if len(entries) < 1024 {
+ if !hasMore {
break
}
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 3b8ced675..d9f91b125 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -29,16 +29,20 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
+ var processedTsNs int64
+ var err error
for {
+
+ processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
@@ -46,6 +50,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return true
}, eachLogEntryFn)
if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
@@ -73,19 +80,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- // println("reading from persisted logs ...")
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
- glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ var processedTsNs int64
+ var err error
- // println("reading from in memory logs ...")
for {
+ // println("reading from persisted logs ...")
+ processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+ // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ // println("reading from in memory logs ...")
+
lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
@@ -93,6 +104,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
return true
}, eachLogEntryFn)
if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
glog.Errorf("processed to %v: %v", lastReadTime, err)
time.Sleep(3127 * time.Millisecond)
if err != log_buffer.ResumeError {
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2da129ab2..22474a5e2 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -26,9 +26,12 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -56,7 +59,7 @@ type FilerOption struct {
Port uint32
recursiveDelete bool
Cipher bool
- CacheToFilerLimit int64
+ SaveToFilerLimit int
Filers []string
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index d55bf7cbb..9fdc03dea 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -6,6 +6,7 @@ import (
"io"
"mime"
"net/http"
+ "net/url"
"path/filepath"
"strconv"
"strings"
@@ -99,6 +100,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.Header().Set(k, string(v))
}
+ //Seaweed custom header are not visible to Vue or javascript
+ seaweedHeaders := []string{}
+ for header, _ := range w.Header() {
+ if strings.HasPrefix(header, "Seaweed-") {
+ seaweedHeaders = append(seaweedHeaders, header)
+ }
+ }
+ seaweedHeaders = append(seaweedHeaders, "Content-Disposition")
+ w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ","))
+
//set tag count
if r.Method == "GET" {
tagCount := 0
@@ -121,6 +132,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
setEtag(w, etag)
filename := entry.Name()
+ filename = url.QueryEscape(filename)
adjustHeaderContentDisposition(w, r, filename)
totalSize := int64(entry.Size())
@@ -146,8 +158,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
if offset+size <= int64(len(entry.Content)) {
+ if httpStatusCode != 0 {
+ w.WriteHeader(httpStatusCode)
+ }
_, err := writer.Write(entry.Content[offset : offset+size])
return err
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index f303ba1d4..9cf79ab41 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -36,7 +36,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
namePattern := r.FormValue("namePattern")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern)
+ entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
@@ -44,7 +44,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
return
}
- shouldDisplayLoadMore := len(entries) == limit
if path == "/" {
path = ""
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index eee39152b..237f121fe 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -96,12 +96,6 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return nil, nil, err
}
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
-
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
@@ -111,25 +105,26 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := ""
- contentType := ""
+ contentType := r.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
if err != nil {
return nil, nil, err
}
- fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks)
- if replyerr != nil {
- glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
- return
- }
-
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
return
}
+func isAppend(r *http.Request) bool {
+ return r.URL.Query().Get("op") == "append"
+}
+
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -151,26 +146,62 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
- glog.V(4).Infoln("saving", path)
- entry := &filer.Entry{
- FullPath: util.FullPath(path),
- Attr: filer.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: so.Replication,
- Collection: so.Collection,
- TtlSec: so.TtlSeconds,
- Mime: contentType,
- Md5: md5bytes,
- FileSize: uint64(chunkOffset),
- },
- Chunks: fileChunks,
- Content: content,
+ var entry *filer.Entry
+ var mergedChunks []*filer_pb.FileChunk
+ // when it is an append
+ if isAppend(r) {
+ existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if findErr != nil && findErr != filer_pb.ErrNotFound {
+ glog.V(0).Infof("failing to find %s: %v", path, findErr)
+ }
+ entry = existingEntry
+ }
+ if entry != nil {
+ entry.Mtime = time.Now()
+ entry.Md5 = nil
+ // adjust chunk offsets
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ mergedChunks = append(entry.Chunks, fileChunks...)
+ entry.FileSize += uint64(chunkOffset)
+
+ // TODO
+ if len(entry.Content) > 0 {
+ replyerr = fmt.Errorf("append to small file is not supported yet")
+ return
+ }
+
+ } else {
+ glog.V(4).Infoln("saving", path)
+ mergedChunks = fileChunks
+ entry = &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: so.Replication,
+ Collection: so.Collection,
+ TtlSec: so.TtlSeconds,
+ Mime: contentType,
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
+ },
+ Content: content,
+ }
}
+ // maybe compact entry chunks
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
+ entry.Chunks = mergedChunks
+
filerResult = &FilerPostResult{
Name: fileName,
Size: chunkOffset,
@@ -189,7 +220,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
+ fs.filer.DeleteChunks(fileChunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -204,7 +235,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
chunkOffset := int64(0)
- var smallContent, content []byte
+ var smallContent []byte
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
@@ -213,6 +244,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
if err != nil {
return nil, nil, 0, err, nil
}
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
+ }
+ }
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@@ -239,8 +277,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
return nil, nil, 0, uploadErr, nil
}
- content = data
-
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
break
@@ -260,9 +296,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}
}
- if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 {
- smallContent = content
- }
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go
new file mode 100644
index 000000000..5fcc7e88f
--- /dev/null
+++ b/weed/server/filer_server_rocksdb.go
@@ -0,0 +1,7 @@
+// +build rocksdb
+
+package weed_server
+
+import (
+ _ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb"
+)
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 1c963b39c..1cd4ee21d 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -261,10 +261,13 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
}
+ if httpStatusCode != 0 {
+ w.WriteHeader(httpStatusCode)
+ }
_, e = io.CopyN(writer, rs, size)
return e
})