aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2021-04-02 01:10:24 -0700
committerChris Lu <chris.lu@uber.com>2021-04-02 01:10:24 -0700
commit7e8edc3c4adaf1251f5c773cbeaf3a868269f97a (patch)
treeba87171aee996af75425171db7fd60cae041a387
parentcc0df36a9e767b27e313885a79119e6b14c3cbad (diff)
downloadseaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.tar.xz
seaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.zip
refactoring
-rw-r--r--weed/filesys/wfs.go4
-rw-r--r--weed/operation/upload_processor.go61
-rw-r--r--weed/server/filer_server_handlers_write_upload.go8
-rw-r--r--weed/util/limiter.go82
4 files changed, 64 insertions, 91 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index c6d9080a1..f0ac6d80d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -77,7 +77,7 @@ type WFS struct {
signature int32
// throttle writers
- concurrentWriters *util.LimitedConcurrentExecutor
+ concurrentWriters *util.LimitedOutOfOrderProcessor
Server *fs.Server
}
type statsCache struct {
@@ -135,7 +135,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.fsNodeCache = newFsCache(wfs.root)
if wfs.option.ConcurrentWriters > 0 {
- wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ wfs.concurrentWriters = util.NewLimitedOutOfOrderProcessor(int32(wfs.option.ConcurrentWriters))
}
return wfs
diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go
deleted file mode 100644
index 903ae72ce..000000000
--- a/weed/operation/upload_processor.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package operation
-
-import (
- "reflect"
- "runtime"
- "sync"
- "sync/atomic"
-)
-
-type OperationRequest func()
-
-var (
- requestSlots = uint32(32)
- requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness
- ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage
- concurrentLimitCond = sync.NewCond(new(sync.Mutex))
- concurrentUpload int32
-)
-
-func init() {
-
- for i := 0; i < int(requestSlots); i++ {
- requests[i] = make(chan OperationRequest)
- }
-
- cases := make([]reflect.SelectCase, requestSlots)
- for i, ch := range requests {
- cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
- }
-
- go func() {
- for {
- _, value, ok := reflect.Select(cases)
- if !ok {
- continue
- }
-
- request := value.Interface().(OperationRequest)
-
- concurrentLimitCond.L.Lock()
- for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit {
- concurrentLimitCond.Wait()
- }
- atomic.AddInt32(&concurrentUpload, 1)
- concurrentLimitCond.L.Unlock()
-
- go func() {
- defer atomic.AddInt32(&concurrentUpload, -1)
- defer concurrentLimitCond.Signal()
- request()
- }()
-
- }
- }()
-
-}
-
-func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) {
- index := slotKey % requestSlots
- requests[index] <- request
-}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 21af6a109..25275cf05 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -5,8 +5,8 @@ import (
"hash"
"io"
"io/ioutil"
- "math/rand"
"net/http"
+ "runtime"
"strings"
"sync"
"time"
@@ -20,6 +20,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+var (
+ limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
+)
+
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
md5Hash = md5.New()
@@ -58,7 +62,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
for readErr == nil {
wg.Add(1)
- operation.AsyncOutOfOrderProcess(rand.Uint32(), func() {
+ limitedUploadProcessor.Execute(func() {
defer wg.Done()
var localOffset int64
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
index 91499632c..2e5168d3d 100644
--- a/weed/util/limiter.go
+++ b/weed/util/limiter.go
@@ -1,40 +1,70 @@
package util
-// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
+import (
+ "math/rand"
+ "reflect"
+ "sync"
+ "sync/atomic"
+)
-// LimitedConcurrentExecutor object
-type LimitedConcurrentExecutor struct {
- limit int
- tokenChan chan int
+type OperationRequest func()
+
+type LimitedOutOfOrderProcessor struct {
+ processorSlots uint32
+ processors []chan OperationRequest
+ processorLimit int32
+ processorLimitCond *sync.Cond
+ currentProcessor int32
}
-func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor {
+func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
- // allocate a limiter instance
- c := &LimitedConcurrentExecutor{
- limit: limit,
- tokenChan: make(chan int, limit),
+ processorSlots := uint32(32)
+ c = &LimitedOutOfOrderProcessor{
+ processorSlots: processorSlots,
+ processors: make([]chan OperationRequest, processorSlots),
+ processorLimit: limit,
+ processorLimitCond: sync.NewCond(new(sync.Mutex)),
}
- // allocate the tokenChan:
- for i := 0; i < c.limit; i++ {
- c.tokenChan <- i
+ for i := 0; i < int(processorSlots); i++ {
+ c.processors[i] = make(chan OperationRequest)
}
- return c
-}
+ cases := make([]reflect.SelectCase, processorSlots)
+ for i, ch := range c.processors {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
-// Execute adds a function to the execution queue.
-// if num of go routines allocated by this instance is < limit
-// launch a new go routine to execute job
-// else wait until a go routine becomes available
-func (c *LimitedConcurrentExecutor) Execute(job func()) {
- token := <-c.tokenChan
go func() {
- defer func() {
- c.tokenChan <- token
- }()
- // run the job
- job()
+ for {
+ _, value, ok := reflect.Select(cases)
+ if !ok {
+ continue
+ }
+
+ request := value.Interface().(OperationRequest)
+
+ c.processorLimitCond.L.Lock()
+ for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
+ c.processorLimitCond.Wait()
+ }
+ atomic.AddInt32(&c.currentProcessor, 1)
+ c.processorLimitCond.L.Unlock()
+
+ go func() {
+ defer atomic.AddInt32(&c.currentProcessor, -1)
+ defer c.processorLimitCond.Signal()
+ request()
+ }()
+
+ }
}()
+
+ return c
+}
+
+func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
+ index := rand.Uint32() % c.processorSlots
+ c.processors[index] <- request
}