aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorwusong <75450248+wusongANKANG@users.noreply.github.com>2023-06-04 04:38:27 +0800
committerGitHub <noreply@github.com>2023-06-03 13:38:27 -0700
commit5aec6da8a3e0f815eff853784d5200c781722a57 (patch)
treeffce82b523a7474e2184e8e0d12259293440f11d
parente23f3d6eca906208be36c15f867add71d487645f (diff)
downloadseaweedfs-5aec6da8a3e0f815eff853784d5200c781722a57.tar.xz
seaweedfs-5aec6da8a3e0f815eff853784d5200c781722a57.zip
fix deadlock for filer upload (#4527)
Signed-off-by: wang wusong <wangwusong@virtaitech.com> Co-authored-by: wang wusong <wangwusong@virtaitech.com>
-rw-r--r--weed/server/filer_server_handlers_write_upload.go26
1 files changed, 8 insertions, 18 deletions
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index cc43eba64..6b4a820e6 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -4,15 +4,15 @@ import (
"bytes"
"crypto/md5"
"fmt"
- "golang.org/x/exp/slices"
"hash"
"io"
"net/http"
"strconv"
"sync"
- "sync/atomic"
"time"
+ "golang.org/x/exp/slices"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -49,23 +49,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
var wg sync.WaitGroup
- var bytesBufferCounter int64
- bytesBufferLimitCond := sync.NewCond(new(sync.Mutex))
+ var bytesBufferCounter int64 = 4
+ bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter)
var fileChunksLock sync.Mutex
var uploadErrLock sync.Mutex
for {
// need to throttle used byte buffer
- bytesBufferLimitCond.L.Lock()
- for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
- glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
- bytesBufferLimitCond.Wait()
- }
- atomic.AddInt64(&bytesBufferCounter, 1)
- bytesBufferLimitCond.L.Unlock()
+ bytesBufferLimitChan <- struct{}{}
bytesBuffer := bufPool.Get().(*bytes.Buffer)
- glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
limitedReader := io.LimitReader(partReader, int64(chunkSize))
@@ -76,8 +69,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
// data, err := io.ReadAll(limitedReader)
if err != nil || dataSize == 0 {
bufPool.Put(bytesBuffer)
- atomic.AddInt64(&bytesBufferCounter, -1)
- bytesBufferLimitCond.Signal()
+ <-bytesBufferLimitChan
uploadErrLock.Lock()
uploadErr = err
uploadErrLock.Unlock()
@@ -89,8 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
smallContent = make([]byte, dataSize)
bytesBuffer.Read(smallContent)
bufPool.Put(bytesBuffer)
- atomic.AddInt64(&bytesBufferCounter, -1)
- bytesBufferLimitCond.Signal()
+ <-bytesBufferLimitChan
stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc()
break
}
@@ -102,8 +93,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
go func(offset int64) {
defer func() {
bufPool.Put(bytesBuffer)
- atomic.AddInt64(&bytesBufferCounter, -1)
- bytesBufferLimitCond.Signal()
+ <-bytesBufferLimitChan
wg.Done()
}()