diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-04-02 01:10:24 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-04-02 01:10:24 -0700 |
| commit | 7e8edc3c4adaf1251f5c773cbeaf3a868269f97a (patch) | |
| tree | ba87171aee996af75425171db7fd60cae041a387 /weed/operation | |
| parent | cc0df36a9e767b27e313885a79119e6b14c3cbad (diff) | |
| download | seaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.tar.xz seaweedfs-7e8edc3c4adaf1251f5c773cbeaf3a868269f97a.zip | |
refactoring
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/upload_processor.go | 61 |
1 files changed, 0 insertions, 61 deletions
diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go deleted file mode 100644 index 903ae72ce..000000000 --- a/weed/operation/upload_processor.go +++ /dev/null @@ -1,61 +0,0 @@ -package operation - -import ( - "reflect" - "runtime" - "sync" - "sync/atomic" -) - -type OperationRequest func() - -var ( - requestSlots = uint32(32) - requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness - ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage - concurrentLimitCond = sync.NewCond(new(sync.Mutex)) - concurrentUpload int32 -) - -func init() { - - for i := 0; i < int(requestSlots); i++ { - requests[i] = make(chan OperationRequest) - } - - cases := make([]reflect.SelectCase, requestSlots) - for i, ch := range requests { - cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} - } - - go func() { - for { - _, value, ok := reflect.Select(cases) - if !ok { - continue - } - - request := value.Interface().(OperationRequest) - - concurrentLimitCond.L.Lock() - for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit { - concurrentLimitCond.Wait() - } - atomic.AddInt32(&concurrentUpload, 1) - concurrentLimitCond.L.Unlock() - - go func() { - defer atomic.AddInt32(&concurrentUpload, -1) - defer concurrentLimitCond.Signal() - request() - }() - - } - }() - -} - -func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) { - index := slotKey % requestSlots - requests[index] <- request -} |
