aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2024-04-11 16:47:21 +0500
committerGitHub <noreply@github.com>2024-04-11 04:47:21 -0700
commit5189a09de052c799458d5cac2cd0593ac6cacd72 (patch)
tree25cf9c8d7957741af6fa24f946aced5007f4cc09
parent6dae685f9c17706799b976ad384398418156483c (diff)
downloadseaweedfs-5189a09de052c799458d5cac2cd0593ac6cacd72.tar.xz
seaweedfs-5189a09de052c799458d5cac2cd0593ac6cacd72.zip
[volume] Reduce the number of buffers for uploading one chunk (#5458)
-rw-r--r--weed/operation/upload_content.go25
-rw-r--r--weed/server/volume_server_handlers_write.go6
-rw-r--r--weed/storage/needle/needle_write.go12
-rw-r--r--weed/topology/store_replicate.go4
-rw-r--r--weed/util/buffer_pool/sync_pool.go20
5 files changed, 50 insertions, 17 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index b48d73deb..a1df07d7e 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "github.com/valyala/bytebufferpool"
"io"
"mime"
"mime/multipart"
@@ -32,6 +33,7 @@ type UploadOption struct {
Jwt security.EncodedJwt
RetryForever bool
Md5 string
+ BytesBuffer *bytes.Buffer
}
type UploadResult struct {
@@ -261,6 +263,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
PairMap: option.PairMap,
Jwt: option.Jwt,
Md5: option.Md5,
+ BytesBuffer: option.BytesBuffer,
})
if uploadResult == nil {
return
@@ -275,9 +278,17 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
}
func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
- buf := GetBuffer()
- defer PutBuffer(buf)
- body_writer := multipart.NewWriter(buf)
+ var body_writer *multipart.Writer
+ var reqReader *bytes.Reader
+ var buf *bytebufferpool.ByteBuffer
+ if option.BytesBuffer == nil {
+ buf = GetBuffer()
+ defer PutBuffer(buf)
+ body_writer = multipart.NewWriter(buf)
+ } else {
+ option.BytesBuffer.Reset()
+ body_writer = multipart.NewWriter(option.BytesBuffer)
+ }
h := make(textproto.MIMEHeader)
filename := fileNameEscaper.Replace(option.Filename)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename))
@@ -309,8 +320,12 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
glog.V(0).Infoln("error closing body", err)
return nil, err
}
-
- req, postErr := http.NewRequest("POST", option.UploadUrl, bytes.NewReader(buf.Bytes()))
+ if option.BytesBuffer == nil {
+ reqReader = bytes.NewReader(buf.Bytes())
+ } else {
+ reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
+ }
+ req, postErr := http.NewRequest("POST", option.UploadUrl, reqReader)
if postErr != nil {
glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr)
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 7517d8641..6e151bf80 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "bytes"
"errors"
"fmt"
"net/http"
@@ -13,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/topology"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -35,8 +35,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- bytesBuffer := bufPool.Get().(*bytes.Buffer)
- defer bufPool.Put(bytesBuffer)
+ bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+ defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer)
if ne != nil {
diff --git a/weed/storage/needle/needle_write.go b/weed/storage/needle/needle_write.go
index 6546f35a6..51d3bcf40 100644
--- a/weed/storage/needle/needle_write.go
+++ b/weed/storage/needle/needle_write.go
@@ -7,16 +7,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
. "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
"math"
- "sync"
)
-var bufPool = sync.Pool{
- New: func() interface{} {
- return new(bytes.Buffer)
- },
-}
-
func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) {
writeBytes.Reset()
switch version {
@@ -132,8 +126,8 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return
}
- bytesBuffer := bufPool.Get().(*bytes.Buffer)
- defer bufPool.Put(bytesBuffer)
+ bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+ defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer)
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index a5a406459..82c2db79c 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
)
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
@@ -87,6 +88,8 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
pairMap[needle.PairNamePrefix+k] = v
}
}
+ bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+ defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
// volume server do not know about encryption
// TODO optimize here to compress data only once
@@ -99,6 +102,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
PairMap: pairMap,
Jwt: jwt,
Md5: contentMd5,
+ BytesBuffer: bytesBuffer,
}
_, err := operation.UploadData(n.Data, uploadOption)
diff --git a/weed/util/buffer_pool/sync_pool.go b/weed/util/buffer_pool/sync_pool.go
new file mode 100644
index 000000000..b97274691
--- /dev/null
+++ b/weed/util/buffer_pool/sync_pool.go
@@ -0,0 +1,20 @@
+package buffer_pool
+
+import (
+ "bytes"
+ "sync"
+)
+
+var syncPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+func SyncPoolGetBuffer() *bytes.Buffer {
+ return syncPool.Get().(*bytes.Buffer)
+}
+
+func SyncPoolPutBuffer(buffer *bytes.Buffer) {
+ syncPool.Put(buffer)
+}