aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2021-04-02 01:10:24 -0700
committerChris Lu <chris.lu@uber.com>2021-04-02 01:10:24 -0700
commit7e8edc3c4adaf1251f5c773cbeaf3a868269f97a (patch)
treeba87171aee996af75425171db7fd60cae041a387 /weed/operation
parentcc0df36a9e767b27e313885a79119e6b14c3cbad (diff)
downloadseaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.tar.xz
seaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.zip
refactoring
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/upload_processor.go61
1 files changed, 0 insertions, 61 deletions
diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go
deleted file mode 100644
index 903ae72ce..000000000
--- a/weed/operation/upload_processor.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package operation
-
-import (
- "reflect"
- "runtime"
- "sync"
- "sync/atomic"
-)
-
-type OperationRequest func()
-
-var (
- requestSlots = uint32(32)
- requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness
- ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage
- concurrentLimitCond = sync.NewCond(new(sync.Mutex))
- concurrentUpload int32
-)
-
-func init() {
-
- for i := 0; i < int(requestSlots); i++ {
- requests[i] = make(chan OperationRequest)
- }
-
- cases := make([]reflect.SelectCase, requestSlots)
- for i, ch := range requests {
- cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
- }
-
- go func() {
- for {
- _, value, ok := reflect.Select(cases)
- if !ok {
- continue
- }
-
- request := value.Interface().(OperationRequest)
-
- concurrentLimitCond.L.Lock()
- for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit {
- concurrentLimitCond.Wait()
- }
- atomic.AddInt32(&concurrentUpload, 1)
- concurrentLimitCond.L.Unlock()
-
- go func() {
- defer atomic.AddInt32(&concurrentUpload, -1)
- defer concurrentLimitCond.Signal()
- request()
- }()
-
- }
- }()
-
-}
-
-func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) {
- index := slotKey % requestSlots
- requests[index] <- request
-}