blob: 903ae72cea95a865e3a8e010028bbda90abf146f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package operation
import (
"reflect"
"runtime"
"sync"
"sync/atomic"
)
type OperationRequest func()
var (
requestSlots = uint32(32)
requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness
ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage
concurrentLimitCond = sync.NewCond(new(sync.Mutex))
concurrentUpload int32
)
func init() {
for i := 0; i < int(requestSlots); i++ {
requests[i] = make(chan OperationRequest)
}
cases := make([]reflect.SelectCase, requestSlots)
for i, ch := range requests {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
go func() {
for {
_, value, ok := reflect.Select(cases)
if !ok {
continue
}
request := value.Interface().(OperationRequest)
concurrentLimitCond.L.Lock()
for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit {
concurrentLimitCond.Wait()
}
atomic.AddInt32(&concurrentUpload, 1)
concurrentLimitCond.L.Unlock()
go func() {
defer atomic.AddInt32(&concurrentUpload, -1)
defer concurrentLimitCond.Signal()
request()
}()
}
}()
}
func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) {
index := slotKey % requestSlots
requests[index] <- request
}
|