aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-06-06 23:05:17 -0700
committerChris Lu <chris.lu@gmail.com>2021-06-06 23:05:17 -0700
commit452c6ef18313bbf9655c133e5b5ed9456b983006 (patch)
tree5527622e82c51939a6eaac11e7aff103819f62bc
parent8295e2feb680f383d1d28c51052168fe549ef375 (diff)
downloadseaweedfs-452c6ef18313bbf9655c133e5b5ed9456b983006.tar.xz
seaweedfs-452c6ef18313bbf9655c133e5b5ed9456b983006.zip
limits concurrent uploads for one file2.51
-rw-r--r--weed/server/filer_server_handlers_write_upload.go28
1 files changed, 25 insertions, 3 deletions
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index cc9bb0dc0..32a722507 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -10,6 +10,7 @@ import (
"sort"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -38,11 +39,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var uploadErr error
var wg sync.WaitGroup
+ var bytesBufferCounter int64
+ bytesBufferLimitCond := sync.NewCond(new(sync.Mutex))
for {
- // need to throttle this for large files
+ // need to throttle used byte buffer
+ bytesBufferLimitCond.L.Lock()
+ for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
+ glog.V(4).Infof("waiting for byte buffer %d", bytesBufferCounter)
+ bytesBufferLimitCond.Wait()
+ }
+ atomic.AddInt64(&bytesBufferCounter, 1)
+ bytesBufferLimitCond.L.Unlock()
+
bytesBuffer := bufPool.Get().(*bytes.Buffer)
- defer bufPool.Put(bytesBuffer)
+ glog.V(4).Infof("received byte buffer %d", bytesBufferCounter)
limitedReader := io.LimitReader(partReader, int64(chunkSize))
@@ -52,6 +63,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
// data, err := ioutil.ReadAll(limitedReader)
if err != nil || dataSize == 0 {
+ bufPool.Put(bytesBuffer)
+ atomic.AddInt64(&bytesBufferCounter, -1)
+ bytesBufferLimitCond.Signal()
return nil, md5Hash, 0, err, nil
}
if chunkOffset == 0 && !isAppend(r) {
@@ -59,13 +73,21 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
chunkOffset += dataSize
smallContent = make([]byte, dataSize)
bytesBuffer.Read(smallContent)
+ bufPool.Put(bytesBuffer)
+ atomic.AddInt64(&bytesBufferCounter, -1)
+ bytesBufferLimitCond.Signal()
break
}
}
wg.Add(1)
go func(offset int64) {
- defer wg.Done()
+ defer func() {
+ bufPool.Put(bytesBuffer)
+ atomic.AddInt64(&bytesBufferCounter, -1)
+ bytesBufferLimitCond.Signal()
+ wg.Done()
+ }()
chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
if toChunkErr != nil {