aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/upload_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/upload_processor.go')
-rw-r--r--weed/operation/upload_processor.go61
1 files changed, 61 insertions, 0 deletions
diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go
new file mode 100644
index 000000000..903ae72ce
--- /dev/null
+++ b/weed/operation/upload_processor.go
@@ -0,0 +1,61 @@
+package operation
+
+import (
+ "reflect"
+ "runtime"
+ "sync"
+ "sync/atomic"
+)
+
+type OperationRequest func()
+
+var (
+ requestSlots = uint32(32)
+ requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness
+ ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage
+ concurrentLimitCond = sync.NewCond(new(sync.Mutex))
+ concurrentUpload int32
+)
+
+func init() {
+
+ for i := 0; i < int(requestSlots); i++ {
+ requests[i] = make(chan OperationRequest)
+ }
+
+ cases := make([]reflect.SelectCase, requestSlots)
+ for i, ch := range requests {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+
+ go func() {
+ for {
+ _, value, ok := reflect.Select(cases)
+ if !ok {
+ continue
+ }
+
+ request := value.Interface().(OperationRequest)
+
+ concurrentLimitCond.L.Lock()
+ for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit {
+ concurrentLimitCond.Wait()
+ }
+ atomic.AddInt32(&concurrentUpload, 1)
+ concurrentLimitCond.L.Unlock()
+
+ go func() {
+ defer atomic.AddInt32(&concurrentUpload, -1)
+ defer concurrentLimitCond.Signal()
+ request()
+ }()
+
+ }
+ }()
+
+}
+
+func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) {
+ index := slotKey % requestSlots
+ requests[index] <- request
+}