diff options
Diffstat (limited to 'weed/server')
25 files changed, 582 insertions, 193 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 9001a3b33..5c5f1b8eb 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -234,12 +234,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file } } -func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) { +func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) { rangeReq := r.Header.Get("Range") if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(w, 0, totalSize, 0); err != nil { + if err := writeFn(w, 0, totalSize); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -279,7 +279,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) - err = writeFn(w, ra.start, ra.length, http.StatusPartialContent) + err = writeFn(w, ra.start, ra.length) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -307,7 +307,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 pw.CloseWithError(e) return } - if e = writeFn(part, ra.start, ra.length, 0); e != nil { + if e = writeFn(part, ra.start, ra.length); e != nil { pw.CloseWithError(e) return } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a4bb721ef..3821de6a9 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -63,7 +63,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file var listErr error for limit > 0 { var hasEntries bool - lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool { + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool { hasEntries = true if err = stream.Send(&filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 5b68b64de..eadb970d5 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -33,8 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - var events MoveEvents - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, &events) + moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -48,11 +47,11 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { - if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error { + if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil { + if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil { return err } } @@ -64,7 +63,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -75,7 +74,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. includeLastFile := false for { - entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "") + entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "", "") if err != nil { return err } @@ -85,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events) + err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) if err != nil { return err } @@ -97,8 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents, - moveFolderSubEntries func() error) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -122,8 +120,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat return createErr } - events.newEntries = append(events.newEntries, newEntry) - if moveFolderSubEntries != nil { if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { return moveChildrenErr @@ -136,13 +132,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat return deleteErr } - events.oldEntries = append(events.oldEntries, entry) - return nil } - -type MoveEvents struct { - oldEntries []*filer.Entry - newEntries []*filer.Entry -} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 22474a5e2..2734223ea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -45,22 +45,23 @@ import ( ) type FilerOption struct { - Masters []string - Collection string - DefaultReplication string - DisableDirListing bool - MaxMB int - DirListingLimit int - DataCenter string - Rack string - DefaultLevelDbDir string - DisableHttp bool - Host string - Port uint32 - recursiveDelete bool - Cipher bool - SaveToFilerLimit int - Filers []string + Masters []string + Collection string + DefaultReplication string + DisableDirListing bool + MaxMB int + DirListingLimit int + DataCenter string + Rack string + DefaultLevelDbDir string + DisableHttp bool + Host string + Port uint32 + recursiveDelete bool + Cipher bool + SaveToFilerLimit int64 + Filers []string + ConcurrentUploadLimit int64 } type FilerServer struct { @@ -79,14 +80,18 @@ type FilerServer struct { brokers map[string]map[string]bool brokersLock sync.Mutex + + inFlightDataSize int64 + inFlightDataLimitCond *sync.Cond } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), - brokers: make(map[string]map[string]bool), + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + brokers: make(map[string]map[string]bool), + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } fs.listenersCond = sync.NewCond(&fs.listenersLock) @@ -153,7 +158,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) func (fs *FilerServer) checkWithMaster() { for _, master := range fs.option.Masters { - _, err := pb.ParseFilerGrpcAddress(master) + _, err := pb.ParseServerToGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 3bc0c5d0d..ed6bbb6f6 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -4,6 +4,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/stats" @@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { fs.DeleteHandler(w, r) } stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) - case "PUT": - stats.FilerRequestCounter.WithLabelValues("put").Inc() - if _, ok := r.URL.Query()["tagging"]; ok { - fs.PutTaggingHandler(w, r) - } else { - fs.PostHandler(w, r) + case "POST", "PUT": + + // wait until in flight data is less than the limit + contentLength := getContentLength(r) + fs.inFlightDataLimitCond.L.Lock() + for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit { + fs.inFlightDataLimitCond.Wait() + } + atomic.AddInt64(&fs.inFlightDataSize, contentLength) + fs.inFlightDataLimitCond.L.Unlock() + defer func() { + atomic.AddInt64(&fs.inFlightDataSize, -contentLength) + fs.inFlightDataLimitCond.Signal() + }() + + if r.Method == "PUT" { + stats.FilerRequestCounter.WithLabelValues("put").Inc() + if _, ok := r.URL.Query()["tagging"]; ok { + fs.PutTaggingHandler(w, r) + } else { + fs.PostHandler(w, r, contentLength) + } + stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) + } else { // method == "POST" + stats.FilerRequestCounter.WithLabelValues("post").Inc() + fs.PostHandler(w, r, contentLength) + stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) } - stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) - case "POST": - stats.FilerRequestCounter.WithLabelValues("post").Inc() - fs.PostHandler(w, r) - stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) case "OPTIONS": stats.FilerRequestCounter.WithLabelValues("options").Inc() OptionsHandler(w, r, false) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index e210f4bdf..6bc09e953 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -79,7 +79,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat)) if r.Header.Get("If-Modified-Since") != "" { if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { - if t.After(entry.Attr.Mtime) { + if !t.Before(entry.Attr.Mtime) { w.WriteHeader(http.StatusNotModified) return } @@ -131,6 +131,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, true) + }) return } @@ -150,16 +153,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error { - if httpStatusCode != 0 { - w.WriteHeader(httpStatusCode) - } + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) - glog.Errorf("failed to write entry content: %v", err) + if err != nil { + glog.Errorf("failed to write entry content: %v", err) + } return err } - return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, false) }) } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 9cf79ab41..307c411b6 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -35,8 +35,9 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") namePattern := r.FormValue("namePattern") + namePatternExclude := r.FormValue("namePatternExclude") - entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern) + entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 0ce3e4a58..95eba9d3d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -52,7 +52,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u return } -func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { +func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) { ctx := context.Background() @@ -66,7 +66,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query.Get("rack"), ) - fs.autoChunk(ctx, w, r, so) + fs.autoChunk(ctx, w, r, contentLength, so) util.CloseRequest(r) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d3ce7e605..c4f10d94e 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -2,11 +2,8 @@ package weed_server import ( "context" - "crypto/md5" "fmt" - "hash" "io" - "io/ioutil" "net/http" "os" "path" @@ -19,13 +16,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -38,10 +34,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * chunkSize := 1024 * 1024 * maxMB - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + stats.FilerRequestCounter.WithLabelValues("chunk").Inc() start := time.Now() defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds()) }() var reply *FilerPostResult @@ -51,14 +47,16 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { reply, err = fs.mkdir(ctx, w, r) } else { - reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, so) + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so) } } else { - reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, so) + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so) } if err != nil { if strings.HasPrefix(err.Error(), "read input:") { writeJsonError(w, r, 499, err) + } else if strings.HasSuffix(err.Error(), "is a file") { + writeJsonError(w, r, http.StatusConflict, err) } else { writeJsonError(w, r, http.StatusInternalServerError, err) } @@ -70,7 +68,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -91,7 +89,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite contentType = "" } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -102,7 +100,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := path.Base(r.URL.Path) contentType := r.Header.Get("Content-Type") @@ -110,7 +108,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter contentType = "" } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -144,6 +142,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa if fileName != "" { path += fileName } + } else { + if fileName != "" { + if possibleDirEntry, findDirErr := fs.filer.FindEntry(ctx, util.FullPath(path)); findDirErr == nil { + if possibleDirEntry.IsDirectory() { + path += "/" + fileName + } + } + } } var entry *filer.Entry @@ -212,7 +218,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa entry.Extended = make(map[string][]byte) } - fs.saveAmzMetaData(r, entry) + SaveAmzMetaData(r, entry.Extended, false) for k, v := range r.Header { if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) { @@ -229,89 +235,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { - var fileChunks []*filer_pb.FileChunk - - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) - - chunkOffset := int64(0) - var smallContent []byte - - for { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) - - data, err := ioutil.ReadAll(limitedReader) - if err != nil { - return nil, nil, 0, err, nil - } - if chunkOffset == 0 && !isAppend(r) { - if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { - smallContent = data - chunkOffset += int64(len(data)) - break - } - } - dataReader := util.NewBytesReader(data) - - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil - } - - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) - if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue - } - break - } - if uploadErr != nil { - return nil, nil, 0, uploadErr, nil - } - - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } - - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) - - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) - - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break - } - } - - return fileChunks, md5Hash, chunkOffset, nil, smallContent -} - -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { - - stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) - }() - - uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) - return uploadResult, err, data -} - func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { @@ -380,17 +303,24 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http return filerResult, replyerr } -func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) { +func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool) (metadata map[string][]byte) { + + metadata = make(map[string][]byte) + if !isReplace { + for k, v := range existing { + metadata[k] = v + } + } if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { - entry.Extended[xhttp.AmzStorageClass] = []byte(sc) + metadata[xhttp.AmzStorageClass] = []byte(sc) } if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" { for _, v := range strings.Split(tags, "&") { tag := strings.Split(v, "=") if len(tag) == 2 { - entry.Extended[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) } } } @@ -398,8 +328,11 @@ func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) { for header, values := range r.Header { if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) { for _, value := range values { - entry.Extended[header] = []byte(value) + metadata[header] = []byte(value) } } } + + return + } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go new file mode 100644 index 000000000..3ab45453e --- /dev/null +++ b/weed/server/filer_server_handlers_write_upload.go @@ -0,0 +1,105 @@ +package weed_server + +import ( + "crypto/md5" + "hash" + "io" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { + var fileChunks []*filer_pb.FileChunk + + md5Hash := md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + + chunkOffset := int64(0) + var smallContent []byte + + for { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + + data, err := ioutil.ReadAll(limitedReader) + if err != nil { + return nil, nil, 0, err, nil + } + if chunkOffset == 0 && !isAppend(r) { + if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { + smallContent = data + chunkOffset += int64(len(data)) + break + } + } + dataReader := util.NewBytesReader(data) + + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return nil, nil, 0, assignErr, nil + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + time.Sleep(251 * time.Millisecond) + continue + } + break + } + if uploadErr != nil { + return nil, nil, 0, uploadErr, nil + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) + + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break + } + } + + return fileChunks, md5Hash, chunkOffset, nil, smallContent +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { + + stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) + }() + + uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + if uploadResult != nil && uploadResult.RetryCount > 0 { + stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) + } + return uploadResult, err, data +} diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index f21cce7d1..5016117a8 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -1,4 +1,4 @@ -package master_ui +package filer_ui import ( "strings" diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index 3f0647119..648b97f22 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -1,4 +1,4 @@ -package master_ui +package filer_ui import ( "github.com/dustin/go-humanize" diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go new file mode 100644 index 000000000..608217ed7 --- /dev/null +++ b/weed/server/gateway_server.go @@ -0,0 +1,106 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/operation" + "google.golang.org/grpc" + "math/rand" + "net/http" + + "github.com/chrislusf/seaweedfs/weed/util" + + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + "github.com/chrislusf/seaweedfs/weed/glog" + _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" + _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" + _ "github.com/chrislusf/seaweedfs/weed/notification/log" + "github.com/chrislusf/seaweedfs/weed/security" +) + +type GatewayOption struct { + Masters []string + Filers []string + MaxMB int + IsSecure bool +} + +type GatewayServer struct { + option *GatewayOption + secret security.SigningKey + grpcDialOption grpc.DialOption +} + +func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) { + + fs = &GatewayServer{ + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), + } + + if len(option.Masters) == 0 { + glog.Fatal("master list is required!") + } + + defaultMux.HandleFunc("/blobs/", fs.blobsHandler) + defaultMux.HandleFunc("/files/", fs.filesHandler) + defaultMux.HandleFunc("/topics/", fs.topicsHandler) + + return fs, nil +} + +func (fs *GatewayServer) getMaster() string { + randMaster := rand.Intn(len(fs.option.Masters)) + return fs.option.Masters[randMaster] +} + +func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + chunkId := r.URL.Path[len("/blobs/"):] + fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + var jwtAuthorization security.EncodedJwt + if fs.option.IsSecure { + jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId) + } + body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization)) + if err != nil { + writeJsonError(w, r, http.StatusNotFound, err) + return + } + w.WriteHeader(statusCode) + w.Write(body) + case "POST": + submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption) + } +} + +func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "DELETE": + case "POST": + } +} + +func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "POST": + } +} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 0f0b7f101..3e6d9bb9e 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -80,10 +80,14 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts) glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) + var dataCenter string + if dc := dn.GetDataCenter(); dc != nil { + dataCenter = string(dc.Id()) + } message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, - DataCenter: string(dn.GetDataCenter().Id()), + DataCenter: dataCenter, } if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { // process delta volume ids if exists for fast volume id updates diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 7e7dcb36b..93c9e4e4e 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "math/rand" "sync" "time" @@ -60,6 +61,7 @@ const ( type AdminLock struct { accessSecret int64 accessLockTime time.Time + lastClient string } type AdminLocks struct { @@ -73,14 +75,15 @@ func NewAdminLocks() *AdminLocks { } } -func (locks *AdminLocks) isLocked(lockName string) bool { +func (locks *AdminLocks) isLocked(lockName string) (clientName string, isLocked bool) { locks.RLock() defer locks.RUnlock() adminLock, found := locks.locks[lockName] if !found { - return false + return "", false } - return adminLock.accessLockTime.Add(LockDuration).After(time.Now()) + glog.V(4).Infof("isLocked %v", adminLock.lastClient) + return adminLock.lastClient, adminLock.accessLockTime.Add(LockDuration).After(time.Now()) } func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool { @@ -93,12 +96,13 @@ func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64 return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token } -func (locks *AdminLocks) generateToken(lockName string) (ts time.Time, token int64) { +func (locks *AdminLocks) generateToken(lockName string, clientName string) (ts time.Time, token int64) { locks.Lock() defer locks.Unlock() lock := &AdminLock{ accessSecret: rand.Int63(), accessLockTime: time.Now(), + lastClient: clientName, } locks.locks[lockName] = lock return lock.accessLockTime, lock.accessSecret @@ -113,18 +117,19 @@ func (locks *AdminLocks) deleteLock(lockName string) { func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) { resp := &master_pb.LeaseAdminTokenResponse{} - if ms.adminLocks.isLocked(req.LockName) { + if lastClient, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked { + glog.V(4).Infof("LeaseAdminToken %v", lastClient) if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) { // for renew - ts, token := ms.adminLocks.generateToken(req.LockName) + ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName) resp.Token, resp.LockTsNs = token, ts.UnixNano() return resp, nil } // refuse since still locked - return resp, fmt.Errorf("already locked") + return resp, fmt.Errorf("already locked by " + lastClient) } // for fresh lease request - ts, token := ms.adminLocks.generateToken(req.LockName) + ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName) resp.Token, resp.LockTsNs = token, ts.UnixNano() return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9404081b4..e2b2df18d 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -277,6 +277,13 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer glog.Error(err) seq = nil } + case "snowflake": + var err error + seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port)) + if err != nil { + glog.Error(err) + seq = nil + } default: seq = sequence.NewMemorySequencer() } diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index 9cd58158b..3822c6113 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -14,17 +14,19 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() args := struct { - Version string - Topology interface{} - RaftServer raft.Server - Stats map[string]interface{} - Counters *stats.ServerStats + Version string + Topology interface{} + RaftServer raft.Server + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint }{ util.Version(), ms.Topo.ToMap(), ms.Topo.RaftServer, infos, serverStats, + ms.option.VolumeSizeLimitMB, } ui.StatusTpl.Execute(w, args) } diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index 60873f6aa..31b6353e9 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -22,9 +22,13 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <div class="row"> <div class="col-sm-6"> <h2>Cluster status</h2> - <table class="table"> + <table class="table table-condensed table-striped"> <tbody> <tr> + <th>Volume Size Limit</th> + <td>{{ .VolumeSizeLimitMB }}MB</td> + </tr> + <tr> <th>Free</th> <td>{{ .Topology.Free }}</td> </tr> @@ -38,8 +42,8 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> </tr> <tr> - <td class="col-sm-2 field-label"><label>Other Masters:</label></td> - <td class="col-sm-10"><ul class="list-unstyled"> + <th>Other Masters</th> + <td class="col-sm-5"><ul class="list-unstyled"> {{ range $k, $p := .Peers }} <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li> {{ end }} diff --git a/weed/server/volume_grpc_read_write.go b/weed/server/volume_grpc_read_write.go new file mode 100644 index 000000000..988e9e145 --- /dev/null +++ b/weed/server/volume_grpc_read_write.go @@ -0,0 +1,38 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (vs *VolumeServer) ReadNeedleBlob(ctx context.Context, req *volume_server_pb.ReadNeedleBlobRequest) (resp *volume_server_pb.ReadNeedleBlobResponse, err error) { + resp = &volume_server_pb.ReadNeedleBlobResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.NeedleBlob, err = v.ReadNeedleBlob(req.Offset, types.Size(req.Size)) + if err != nil { + return nil, fmt.Errorf("read needle blob offset %d size %d: %v", req.Offset, req.Size, err) + } + + return resp, nil +} + +func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) { + resp = &volume_server_pb.WriteNeedleBlobResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), req.NeedleBlob, types.Size(req.Size)); err != nil { + return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err) + } + + return resp, nil +} diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index e496b1ce2..e11d607a4 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" + "sync" "google.golang.org/grpc" @@ -34,6 +35,10 @@ type VolumeServer struct { fileSizeLimitBytes int64 isHeartbeating bool stopChan chan bool + + inFlightDataSize int64 + inFlightDataLimitCond *sync.Cond + concurrentUploadLimit int64 } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -48,6 +53,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readRedirect bool, compactionMBPerSecond int, fileSizeLimitMB int, + concurrentUploadLimit int64, ) *VolumeServer { v := util.GetViper() @@ -72,6 +78,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, isHeartbeating: true, stopChan: make(chan bool), + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), + concurrentUploadLimit: concurrentUploadLimit, } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 7852c950a..4527add44 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -2,7 +2,9 @@ package weed_server import ( "net/http" + "strconv" "strings" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/util" @@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque stats.DeleteRequest() vs.guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT", "POST": + + // wait until in flight data is less than the limit + contentLength := getContentLength(r) + vs.inFlightDataLimitCond.L.Lock() + for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit { + vs.inFlightDataLimitCond.Wait() + } + atomic.AddInt64(&vs.inFlightDataSize, contentLength) + vs.inFlightDataLimitCond.L.Unlock() + defer func() { + atomic.AddInt64(&vs.inFlightDataSize, -contentLength) + vs.inFlightDataLimitCond.Signal() + }() + + // processs uploads stats.WriteRequest() vs.guard.WhiteList(vs.PostHandler)(w, r) + case "OPTIONS": stats.ReadRequest() w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") @@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque } } +func getContentLength(r *http.Request) int64 { + contentLength := r.Header.Get("Content-Length") + if contentLength != "" { + length, err := strconv.ParseInt(contentLength, 10, 64) + if err != nil { + return 0 + } + return length + } + return 0 +} + func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) if r.Header.Get("Origin") != "" { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 2db46ac9b..3e977cfd4 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -27,7 +27,7 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - // println(r.Method + " " + r.URL.Path) + glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range")) stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() @@ -261,13 +261,10 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re return nil } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error { + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { if _, e = rs.Seek(offset, 0); e != nil { return e } - if httpStatusCode != 0 { - w.WriteHeader(httpStatusCode) - } _, e = io.CopyN(writer, rs, size) return e }) diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 602b147e1..3d752eda6 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -13,7 +13,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" - "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { @@ -68,7 +67,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.Name = string(reqNeedle.Name) } ret.Size = uint32(originalSize) - ret.ETag = fmt.Sprintf("%x", util.Base64Md5ToBytes(contentMd5)) + ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) w.Header().Set("Content-MD5", contentMd5) diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go new file mode 100644 index 000000000..a009611da --- /dev/null +++ b/weed/server/volume_server_tcp_handlers_write.go @@ -0,0 +1,137 @@ +package weed_server + +import ( + "bufio" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "net" + "strings" +) + +func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { + defer c.Close() + + glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) + + bufReader := bufio.NewReaderSize(c, 1024*1024) + bufWriter := bufio.NewWriterSize(c, 1024*1024) + + for { + cmd, err := bufReader.ReadString('\n') + if err != nil { + if err != io.EOF { + glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) + } + return + } + cmd = cmd[:len(cmd)-1] + switch cmd[0] { + case '+': + fileId := cmd[1:] + err = vs.handleTcpPut(fileId, bufReader) + if err == nil { + bufWriter.Write([]byte("+OK\n")) + } else { + bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) + } + case '-': + fileId := cmd[1:] + err = vs.handleTcpDelete(fileId) + if err == nil { + bufWriter.Write([]byte("+OK\n")) + } else { + bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) + } + case '?': + fileId := cmd[1:] + err = vs.handleTcpGet(fileId, bufWriter) + case '!': + bufWriter.Flush() + } + + } + +} + +func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + err = volume.StreamRead(n, writer) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + volume := vs.store.GetVolume(volumeId) + if volume == nil { + return fmt.Errorf("volume %d not found", volumeId) + } + + sizeBuf := make([]byte, 4) + if _, err = bufReader.Read(sizeBuf); err != nil { + return err + } + dataSize := util.BytesToUint32(sizeBuf) + + err = volume.StreamWrite(n, bufReader, dataSize) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { + + volumeId, n, err2 := vs.parseFileId(fileId) + if err2 != nil { + return err2 + } + + _, err = vs.store.DeleteVolumeNeedle(volumeId, n) + if err != nil { + return err + } + + return nil +} + +func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { + + commaIndex := strings.LastIndex(fileId, ",") + if commaIndex <= 0 { + return 0, nil, fmt.Errorf("unknown fileId %s", fileId) + } + + vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] + + volumeId, ve := needle.NewVolumeId(vid) + if ve != nil { + return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) + } + + n := new(needle.Needle) + n.ParsePath(fid) + return volumeId, n, nil +} diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index 6a8bb6f55..ee4c2e31d 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -1,4 +1,4 @@ -package master_ui +package volume_server_ui import ( "fmt" |
