diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-04-01 02:21:40 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-04-01 02:21:40 -0700 |
| commit | cefe66f1592855b26544f7a84162a3803ed7cda2 (patch) | |
| tree | beba35181976b20537b86f995a378b70d5e1f1be /weed/operation/upload_processor.go | |
| parent | 1f984d26453bc97c95a8aa51e69d05ac74942685 (diff) | |
| download | seaweedfs-cefe66f1592855b26544f7a84162a3803ed7cda2.tar.xz seaweedfs-cefe66f1592855b26544f7a84162a3803ed7cda2.zip | |
dedicated upload processor
avoid thundering effect of overloading volume servers
Diffstat (limited to 'weed/operation/upload_processor.go')
| -rw-r--r-- | weed/operation/upload_processor.go | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/weed/operation/upload_processor.go b/weed/operation/upload_processor.go new file mode 100644 index 000000000..903ae72ce --- /dev/null +++ b/weed/operation/upload_processor.go @@ -0,0 +1,61 @@ +package operation + +import ( + "reflect" + "runtime" + "sync" + "sync/atomic" +) + +type OperationRequest func() + +var ( + requestSlots = uint32(32) + requests = make([]chan OperationRequest, requestSlots) // increase slots to increase fairness + ConcurrentUploadLimit = int32(runtime.NumCPU()) // directly related to memory usage + concurrentLimitCond = sync.NewCond(new(sync.Mutex)) + concurrentUpload int32 +) + +func init() { + + for i := 0; i < int(requestSlots); i++ { + requests[i] = make(chan OperationRequest) + } + + cases := make([]reflect.SelectCase, requestSlots) + for i, ch := range requests { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} + } + + go func() { + for { + _, value, ok := reflect.Select(cases) + if !ok { + continue + } + + request := value.Interface().(OperationRequest) + + concurrentLimitCond.L.Lock() + for atomic.LoadInt32(&concurrentUpload) > ConcurrentUploadLimit { + concurrentLimitCond.Wait() + } + atomic.AddInt32(&concurrentUpload, 1) + concurrentLimitCond.L.Unlock() + + go func() { + defer atomic.AddInt32(&concurrentUpload, -1) + defer concurrentLimitCond.Signal() + request() + }() + + } + }() + +} + +func AsyncOutOfOrderProcess(slotKey uint32, request OperationRequest) { + index := slotKey % requestSlots + requests[index] <- request +} |
