diff options
| author | wusong <75450248+wusongANKANG@users.noreply.github.com> | 2023-06-04 04:38:27 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-06-03 13:38:27 -0700 |
| commit | 5aec6da8a3e0f815eff853784d5200c781722a57 (patch) | |
| tree | ffce82b523a7474e2184e8e0d12259293440f11d | |
| parent | e23f3d6eca906208be36c15f867add71d487645f (diff) | |
| download | seaweedfs-5aec6da8a3e0f815eff853784d5200c781722a57.tar.xz seaweedfs-5aec6da8a3e0f815eff853784d5200c781722a57.zip | |
fix deadlock for filer upload (#4527)
Signed-off-by: wang wusong <wangwusong@virtaitech.com>
Co-authored-by: wang wusong <wangwusong@virtaitech.com>
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 26 |
1 files changed, 8 insertions, 18 deletions
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index cc43eba64..6b4a820e6 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -4,15 +4,15 @@ import ( "bytes" "crypto/md5" "fmt" - "golang.org/x/exp/slices" "hash" "io" "net/http" "strconv" "sync" - "sync/atomic" "time" + "golang.org/x/exp/slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -49,23 +49,16 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) var wg sync.WaitGroup - var bytesBufferCounter int64 - bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) + var bytesBufferCounter int64 = 4 + bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter) var fileChunksLock sync.Mutex var uploadErrLock sync.Mutex for { // need to throttle used byte buffer - bytesBufferLimitCond.L.Lock() - for atomic.LoadInt64(&bytesBufferCounter) >= 4 { - glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter)) - bytesBufferLimitCond.Wait() - } - atomic.AddInt64(&bytesBufferCounter, 1) - bytesBufferLimitCond.L.Unlock() + bytesBufferLimitChan <- struct{}{} bytesBuffer := bufPool.Get().(*bytes.Buffer) - glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter)) limitedReader := io.LimitReader(partReader, int64(chunkSize)) @@ -76,8 +69,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque // data, err := io.ReadAll(limitedReader) if err != nil || dataSize == 0 { bufPool.Put(bytesBuffer) - atomic.AddInt64(&bytesBufferCounter, -1) - bytesBufferLimitCond.Signal() + <-bytesBufferLimitChan uploadErrLock.Lock() uploadErr = err uploadErrLock.Unlock() @@ -89,8 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque smallContent = make([]byte, dataSize) bytesBuffer.Read(smallContent) bufPool.Put(bytesBuffer) - atomic.AddInt64(&bytesBufferCounter, -1) - bytesBufferLimitCond.Signal() + <-bytesBufferLimitChan stats.FilerRequestCounter.WithLabelValues(stats.ContentSaveToFiler).Inc() break } @@ -102,8 +93,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque go func(offset int64) { defer func() { bufPool.Put(bytesBuffer) - atomic.AddInt64(&bytesBufferCounter, -1) - bytesBufferLimitCond.Signal() + <-bytesBufferLimitChan wg.Done() }() |
