aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-06-06 13:42:36 -0700
committerChris Lu <chris.lu@gmail.com>2021-06-06 13:42:36 -0700
commit6c82326575028b23003e371dbe69de1eae206f81 (patch)
tree35c7f185e421db1b8c8ee5e91ddb66dbd9b8763d
parent9cba5cca0be43553f7aae1ab5477a0366765ff02 (diff)
downloadseaweedfs-6c82326575028b23003e371dbe69de1eae206f81.tar.xz
seaweedfs-6c82326575028b23003e371dbe69de1eae206f81.zip
use bytes.Buffer to reduce memory allocation and gc
-rw-r--r--weed/operation/needle_parse_test.go2
-rw-r--r--weed/server/common.go5
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go6
-rw-r--r--weed/server/filer_server_handlers_write_upload.go24
-rw-r--r--weed/server/volume_server_handlers_write.go6
-rw-r--r--weed/storage/needle/needle.go5
-rw-r--r--weed/storage/needle/needle_parse_upload.go53
-rw-r--r--weed/storage/needle/needle_read_write.go59
8 files changed, 102 insertions, 58 deletions
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
index 202374e1b..d7e8a4162 100644
--- a/weed/operation/needle_parse_test.go
+++ b/weed/operation/needle_parse_test.go
@@ -18,7 +18,7 @@ type MockClient struct {
}
func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
- n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
+ n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024, &bytes.Buffer{})
if m.needleHandling != nil {
m.needleHandling(n, originalSize, err)
}
diff --git a/weed/server/common.go b/weed/server/common.go
index 571944c10..2e0ae4058 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"encoding/json"
"errors"
"fmt"
@@ -104,7 +105,9 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
}
debug("parsing upload file...")
- pu, pe := needle.ParseUpload(r, 256*1024*1024)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+ pu, pe := needle.ParseUpload(r, 256*1024*1024, bytesBuffer)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 8334d1618..acaa8f5ab 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"context"
"fmt"
"net/http"
@@ -30,7 +31,10 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024
- pu, err := needle.ParseUpload(r, sizeLimit)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+
+ pu, err := needle.ParseUpload(r, sizeLimit, bytesBuffer)
uncompressedData := pu.Data
if pu.IsGzipped {
uncompressedData = pu.UncompressedData
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 540def563..7082ab0f8 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"strings"
+ "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -19,6 +20,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
var fileChunks []*filer_pb.FileChunk
@@ -28,21 +35,28 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset := int64(0)
var smallContent []byte
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
for {
limitedReader := io.LimitReader(partReader, int64(chunkSize))
- data, err := ioutil.ReadAll(limitedReader)
+ bytesBuffer.Reset()
+
+ dataSize, err := bytesBuffer.ReadFrom(limitedReader)
+
+ // data, err := ioutil.ReadAll(limitedReader)
if err != nil {
return nil, nil, 0, err, nil
}
if chunkOffset == 0 && !isAppend(r) {
- if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
- smallContent = data
- chunkOffset += int64(len(data))
+ if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && dataSize < 4*1024 {
+ chunkOffset += dataSize
+ smallContent = make([]byte, dataSize)
+ bytesBuffer.Write(smallContent)
break
}
}
- dataReader := util.NewBytesReader(data)
+ dataReader := util.NewBytesReader(bytesBuffer.Bytes())
// retry to assign a different file id
var fileId, urlLocation string
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 3d752eda6..58212e8ff 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"errors"
"fmt"
"net/http"
@@ -42,7 +43,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+
+ reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go
index 34d29ab6e..845ffdb24 100644
--- a/weed/storage/needle/needle.go
+++ b/weed/storage/needle/needle.go
@@ -1,6 +1,7 @@
package needle
import (
+ "bytes"
"encoding/json"
"fmt"
"net/http"
@@ -48,9 +49,9 @@ func (n *Needle) String() (str string) {
return
}
-func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
+func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64, bytesBuffer *bytes.Buffer) (n *Needle, originalSize int, contentMd5 string, e error) {
n = new(Needle)
- pu, e := ParseUpload(r, sizeLimit)
+ pu, e := ParseUpload(r, sizeLimit, bytesBuffer)
if e != nil {
return
}
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
index 7201503f1..0888c6b7a 100644
--- a/weed/storage/needle/needle_parse_upload.go
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -1,6 +1,7 @@
package needle
import (
+ "bytes"
"crypto/md5"
"encoding/base64"
"fmt"
@@ -18,11 +19,12 @@ import (
)
type ParsedUpload struct {
- FileName string
- Data []byte
- MimeType string
- PairMap map[string]string
- IsGzipped bool
+ FileName string
+ Data []byte
+ bytesBuffer *bytes.Buffer
+ MimeType string
+ PairMap map[string]string
+ IsGzipped bool
// IsZstd bool
OriginalDataSize int
ModifiedTime uint64
@@ -32,8 +34,9 @@ type ParsedUpload struct {
ContentMd5 string
}
-func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
- pu = &ParsedUpload{}
+func ParseUpload(r *http.Request, sizeLimit int64, bytesBuffer *bytes.Buffer) (pu *ParsedUpload, e error) {
+ bytesBuffer.Reset()
+ pu = &ParsedUpload{bytesBuffer: bytesBuffer}
pu.PairMap = make(map[string]string)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
@@ -72,14 +75,16 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
if mimeType == "application/octet-stream" {
mimeType = ""
}
- if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
- // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType)
- if compressedData, err := util.GzipData(pu.Data); err == nil {
- if len(compressedData)*10 < len(pu.Data)*9 {
- pu.Data = compressedData
- pu.IsGzipped = true
+ if false {
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
+ // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType)
+ if compressedData, err := util.GzipData(pu.Data); err == nil {
+ if len(compressedData)*10 < len(pu.Data)*9 {
+ pu.Data = compressedData
+ pu.IsGzipped = true
+ }
+ // println("gzipped data size", len(compressedData))
}
- // println("gzipped data size", len(compressedData))
}
}
}
@@ -98,15 +103,16 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
return
}
-func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) {
+func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) error {
pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip"
// pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd"
pu.MimeType = r.Header.Get("Content-Type")
pu.FileName = ""
- pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1))
- if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 {
+ dataSize, err := pu.bytesBuffer.ReadFrom(io.LimitReader(r.Body, sizeLimit+1))
+ if err == io.EOF || dataSize == sizeLimit+1 {
io.Copy(ioutil.Discard, r.Body)
}
+ pu.Data = pu.bytesBuffer.Bytes()
r.Body.Close()
return nil
}
@@ -138,15 +144,17 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
pu.FileName = path.Base(pu.FileName)
}
- pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1))
+ var dataSize int64
+ dataSize, e = pu.bytesBuffer.ReadFrom(io.LimitReader(part, sizeLimit+1))
if e != nil {
glog.V(0).Infoln("Reading Content [ERROR]", e)
return
}
- if len(pu.Data) == int(sizeLimit)+1 {
+ if dataSize == sizeLimit+1 {
e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
return
}
+ pu.Data = pu.bytesBuffer.Bytes()
// if the filename is empty string, do a search on the other multi-part items
for pu.FileName == "" {
@@ -159,19 +167,20 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error
// found the first <file type> multi-part has filename
if fName != "" {
- data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1))
+ pu.bytesBuffer.Reset()
+ dataSize2, fe2 := pu.bytesBuffer.ReadFrom(io.LimitReader(part2, sizeLimit+1))
if fe2 != nil {
glog.V(0).Infoln("Reading Content [ERROR]", fe2)
e = fe2
return
}
- if len(data2) == int(sizeLimit)+1 {
+ if dataSize2 == sizeLimit+1 {
e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
return
}
// update
- pu.Data = data2
+ pu.Data = pu.bytesBuffer.Bytes()
pu.FileName = path.Base(fName)
break
}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 16c2fd06b..d208404a8 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -1,6 +1,7 @@
package needle
import (
+ "bytes"
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -9,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"math"
+ "sync"
)
const (
@@ -29,10 +31,14 @@ func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
-func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error) {
-
- writeBytes := make([]byte, 0)
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) {
+ writeBytes.Reset()
switch version {
case Version1:
header := make([]byte, NeedleHeaderSize)
@@ -42,12 +48,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
size := n.Size
actualSize := NeedleHeaderSize + int64(n.Size)
- writeBytes = append(writeBytes, header...)
- writeBytes = append(writeBytes, n.Data...)
+ writeBytes.Write(header)
+ writeBytes.Write(n.Data)
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
- return writeBytes, size, actualSize, nil
+ writeBytes.Write(header[0:NeedleChecksumSize+padding])
+ return size, actualSize, nil
case Version2, Version3:
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
CookieToBytes(header[0:CookieSize], n.Cookie)
@@ -79,51 +85,51 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error
n.Size = 0
}
SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
- writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
+ writeBytes.Write(header[0:NeedleHeaderSize])
if n.DataSize > 0 {
util.Uint32toBytes(header[0:4], n.DataSize)
- writeBytes = append(writeBytes, header[0:4]...)
- writeBytes = append(writeBytes, n.Data...)
+ writeBytes.Write(header[0:4])
+ writeBytes.Write(n.Data)
util.Uint8toBytes(header[0:1], n.Flags)
- writeBytes = append(writeBytes, header[0:1]...)
+ writeBytes.Write(header[0:1])
if n.HasName() {
util.Uint8toBytes(header[0:1], n.NameSize)
- writeBytes = append(writeBytes, header[0:1]...)
- writeBytes = append(writeBytes, n.Name[:n.NameSize]...)
+ writeBytes.Write(header[0:1])
+ writeBytes.Write(n.Name[:n.NameSize])
}
if n.HasMime() {
util.Uint8toBytes(header[0:1], n.MimeSize)
- writeBytes = append(writeBytes, header[0:1]...)
- writeBytes = append(writeBytes, n.Mime...)
+ writeBytes.Write(header[0:1])
+ writeBytes.Write(n.Mime)
}
if n.HasLastModifiedDate() {
util.Uint64toBytes(header[0:8], n.LastModified)
- writeBytes = append(writeBytes, header[8-LastModifiedBytesLength:8]...)
+ writeBytes.Write(header[8-LastModifiedBytesLength:8])
}
if n.HasTtl() && n.Ttl != nil {
n.Ttl.ToBytes(header[0:TtlBytesLength])
- writeBytes = append(writeBytes, header[0:TtlBytesLength]...)
+ writeBytes.Write(header[0:TtlBytesLength])
}
if n.HasPairs() {
util.Uint16toBytes(header[0:2], n.PairsSize)
- writeBytes = append(writeBytes, header[0:2]...)
- writeBytes = append(writeBytes, n.Pairs...)
+ writeBytes.Write(header[0:2])
+ writeBytes.Write(n.Pairs)
}
}
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
if version == Version2 {
- writeBytes = append(writeBytes, header[0:NeedleChecksumSize+padding]...)
+ writeBytes.Write(header[0:NeedleChecksumSize+padding])
} else {
// version3
util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs)
- writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
+ writeBytes.Write(header[0:NeedleChecksumSize+TimestampSize+padding])
}
- return writeBytes, Size(n.DataSize), GetActualSize(n.Size, version), nil
+ return Size(n.DataSize), GetActualSize(n.Size, version), nil
}
- return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
+ return 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
@@ -146,10 +152,13 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return
}
- bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+
+ size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer)
if err == nil {
- _, err = w.WriteAt(bytesToWrite, int64(offset))
+ _, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset))
}
return offset, size, actualSize, err