diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-01-21 00:47:27 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-01-21 00:47:27 -0800 |
| commit | a9e6db1a8e964f10571a5e11c197fd2da141c6f7 (patch) | |
| tree | 0a22db3d6c68178d8e913c1bffd550ae4127a7e1 /weed/server | |
| parent | f0455dee683e831487c345a575b7894c0e2bf61a (diff) | |
| parent | 84f05787f8eecfcb61e49882346ad5855b6bb784 (diff) | |
| download | seaweedfs-origin/ftp.tar.xz seaweedfs-origin/ftp.zip | |
Merge branch 'master' into ftporigin/ftp
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 9 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 44 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 52 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 17 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read_dir.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 109 | ||||
| -rw-r--r-- | weed/server/filer_server_rocksdb.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 5 |
10 files changed, 163 insertions, 92 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 58079032e..cf9547950 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -233,12 +233,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file } } -func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) { +func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) { rangeReq := r.Header.Get("Range") if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(w, 0, totalSize); err != nil { + if err := writeFn(w, 0, totalSize, 0); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -277,9 +277,8 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 ra := ranges[0] w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) - w.WriteHeader(http.StatusPartialContent) - err = writeFn(w, ra.start, ra.length) + err = writeFn(w, ra.start, ra.length, http.StatusPartialContent) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -307,7 +306,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 pw.CloseWithError(e) return } - if e = writeFn(part, ra.start, ra.length); e != nil { + if e = writeFn(part, ra.start, ra.length, 0); e != nil { pw.CloseWithError(e) return } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 5f1b2d819..b0563d8bd 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -44,7 +44,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L }, nil } -func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { +func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) { glog.V(4).Infof("ListEntries %v", req) @@ -60,23 +60,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom + var listErr error for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix) - - if err != nil { - return err - } - if len(entries) == 0 { - return nil - } - - includeLastFile = false - - for _, entry := range entries { - - lastFileName = entry.Name() - - if err := stream.Send(&filer_pb.ListEntriesResponse{ + var hasEntries bool + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool { + hasEntries = true + if err = stream.Send(&filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ Name: entry.Name(), IsDirectory: entry.IsDirectory(), @@ -88,19 +77,28 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file Content: entry.Content, }, }); err != nil { - return err + return false } limit-- if limit == 0 { - return nil + return false } - } + return true + }) - if len(entries) < paginationLimit { - break + if listErr != nil { + return listErr + } + if err != nil { + return err + } + if !hasEntries { + return nil } + includeLastFile = false + } return nil @@ -326,7 +324,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} - if err != nil { + if err != nil && err != filer_pb.ErrNotFound { resp.Error = err.Error() } return resp, nil diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index fa86737ac..5b68b64de 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -75,7 +75,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. includeLastFile := false for { - entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "") + entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "") if err != nil { return err } @@ -90,7 +90,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return err } } - if len(entries) < 1024 { + if !hasMore { break } } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 3b8ced675..d9f91b125 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -29,16 +29,20 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } + var processedTsNs int64 + var err error for { + + processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() @@ -46,6 +50,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return true }, eachLogEntryFn) if err != nil { + if err == log_buffer.ResumeFromDiskError { + continue + } glog.Errorf("processed to %v: %v", lastReadTime, err) time.Sleep(3127 * time.Millisecond) if err != log_buffer.ResumeError { @@ -73,19 +80,23 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - // println("reading from persisted logs ...") - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + var processedTsNs int64 + var err error - // println("reading from in memory logs ...") for { + // println("reading from persisted logs ...") + processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + // println("reading from in memory logs ...") + lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() @@ -93,6 +104,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq return true }, eachLogEntryFn) if err != nil { + if err == log_buffer.ResumeFromDiskError { + continue + } glog.Errorf("processed to %v: %v", lastReadTime, err) time.Sleep(3127 * time.Millisecond) if err != log_buffer.ResumeError { diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 2da129ab2..22474a5e2 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -26,9 +26,12 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -56,7 +59,7 @@ type FilerOption struct { Port uint32 recursiveDelete bool Cipher bool - CacheToFilerLimit int64 + SaveToFilerLimit int Filers []string } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index d55bf7cbb..9fdc03dea 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -6,6 +6,7 @@ import ( "io" "mime" "net/http" + "net/url" "path/filepath" "strconv" "strings" @@ -99,6 +100,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, w.Header().Set(k, string(v)) } + //Seaweed custom header are not visible to Vue or javascript + seaweedHeaders := []string{} + for header, _ := range w.Header() { + if strings.HasPrefix(header, "Seaweed-") { + seaweedHeaders = append(seaweedHeaders, header) + } + } + seaweedHeaders = append(seaweedHeaders, "Content-Disposition") + w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ",")) + //set tag count if r.Method == "GET" { tagCount := 0 @@ -121,6 +132,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, setEtag(w, etag) filename := entry.Name() + filename = url.QueryEscape(filename) adjustHeaderContentDisposition(w, r, filename) totalSize := int64(entry.Size()) @@ -146,8 +158,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error { if offset+size <= int64(len(entry.Content)) { + if httpStatusCode != 0 { + w.WriteHeader(httpStatusCode) + } _, err := writer.Write(entry.Content[offset : offset+size]) return err } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index f303ba1d4..9cf79ab41 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -36,7 +36,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") namePattern := r.FormValue("namePattern") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, namePattern) + entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) @@ -44,7 +44,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque return } - shouldDisplayLoadMore := len(entries) == limit if path == "/" { path = "" } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index eee39152b..237f121fe 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -96,12 +96,6 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return nil, nil, err } - fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks) - if replyerr != nil { - glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) - return - } - md5bytes = md5Hash.Sum(nil) filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) @@ -111,25 +105,26 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := "" - contentType := "" + contentType := r.Header.Get("Content-Type") + if contentType == "application/octet-stream" { + contentType = "" + } fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so) if err != nil { return nil, nil, err } - fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), fileChunks) - if replyerr != nil { - glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) - return - } - md5bytes = md5Hash.Sum(nil) filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) return } +func isAppend(r *http.Request) bool { + return r.URL.Query().Get("op") == "append" +} + func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) { // detect file mode @@ -151,26 +146,62 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } } - glog.V(4).Infoln("saving", path) - entry := &filer.Entry{ - FullPath: util.FullPath(path), - Attr: filer.Attr{ - Mtime: time.Now(), - Crtime: time.Now(), - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: so.Replication, - Collection: so.Collection, - TtlSec: so.TtlSeconds, - Mime: contentType, - Md5: md5bytes, - FileSize: uint64(chunkOffset), - }, - Chunks: fileChunks, - Content: content, + var entry *filer.Entry + var mergedChunks []*filer_pb.FileChunk + // when it is an append + if isAppend(r) { + existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path)) + if findErr != nil && findErr != filer_pb.ErrNotFound { + glog.V(0).Infof("failing to find %s: %v", path, findErr) + } + entry = existingEntry + } + if entry != nil { + entry.Mtime = time.Now() + entry.Md5 = nil + // adjust chunk offsets + for _, chunk := range fileChunks { + chunk.Offset += int64(entry.FileSize) + } + mergedChunks = append(entry.Chunks, fileChunks...) + entry.FileSize += uint64(chunkOffset) + + // TODO + if len(entry.Content) > 0 { + replyerr = fmt.Errorf("append to small file is not supported yet") + return + } + + } else { + glog.V(4).Infoln("saving", path) + mergedChunks = fileChunks + entry = &filer.Entry{ + FullPath: util.FullPath(path), + Attr: filer.Attr{ + Mtime: time.Now(), + Crtime: time.Now(), + Mode: os.FileMode(mode), + Uid: OS_UID, + Gid: OS_GID, + Replication: so.Replication, + Collection: so.Collection, + TtlSec: so.TtlSeconds, + Mime: contentType, + Md5: md5bytes, + FileSize: uint64(chunkOffset), + }, + Content: content, + } } + // maybe compact entry chunks + mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } + entry.Chunks = mergedChunks + filerResult = &FilerPostResult{ Name: fileName, Size: chunkOffset, @@ -189,7 +220,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) + fs.filer.DeleteChunks(fileChunks) replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) @@ -204,7 +235,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) chunkOffset := int64(0) - var smallContent, content []byte + var smallContent []byte for { limitedReader := io.LimitReader(partReader, int64(chunkSize)) @@ -213,6 +244,13 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque if err != nil { return nil, nil, 0, err, nil } + if chunkOffset == 0 && !isAppend(r) { + if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { + smallContent = data + chunkOffset += int64(len(data)) + break + } + } dataReader := util.NewBytesReader(data) // retry to assign a different file id @@ -239,8 +277,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque return nil, nil, 0, uploadErr, nil } - content = data - // if last chunk exhausted the reader exactly at the border if uploadResult.Size == 0 { break @@ -260,9 +296,6 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque } } - if chunkOffset < fs.option.CacheToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && chunkOffset < 4*1024 { - smallContent = content - } return fileChunks, md5Hash, chunkOffset, nil, smallContent } diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go new file mode 100644 index 000000000..5fcc7e88f --- /dev/null +++ b/weed/server/filer_server_rocksdb.go @@ -0,0 +1,7 @@ +// +build rocksdb + +package weed_server + +import ( + _ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb" +) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 1c963b39c..1cd4ee21d 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -261,10 +261,13 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re return nil } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error { if _, e = rs.Seek(offset, 0); e != nil { return e } + if httpStatusCode != 0 { + w.WriteHeader(httpStatusCode) + } _, e = io.CopyN(writer, rs, size) return e }) |
