aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-22 08:09:55 -0800
committerchrislu <chris.lu@gmail.com>2022-01-22 08:09:55 -0800
commit8e80f3cd65112ee3970b74880f2b022356fccdad (patch)
tree1b199daa14bf2a26181b692214761690aed644c0
parentdde34fa99d83ddb41c6c72e4b4bd02399ace3fb5 (diff)
downloadseaweedfs-8e80f3cd65112ee3970b74880f2b022356fccdad.tar.xz
seaweedfs-8e80f3cd65112ee3970b74880f2b022356fccdad.zip
move upload pipeline locking to a different file
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go58
-rw-r--r--weed/filesys/page_writer/upload_pipeline_lock.go63
2 files changed, 63 insertions, 58 deletions
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
index ee85cf6c8..a46c13c4b 100644
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ b/weed/filesys/page_writer/upload_pipeline.go
@@ -119,64 +119,6 @@ func (up *UploadPipeline) FlushAll() {
up.waitForCurrentWritersToComplete()
}
-func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
- startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
- stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
- if stopOffset%up.ChunkSize > 0 {
- stopLogicChunkIndex += 1
- }
- up.activeReadChunksLock.Lock()
- defer up.activeReadChunksLock.Unlock()
- for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
- if count, found := up.activeReadChunks[i]; found {
- up.activeReadChunks[i] = count + 1
- } else {
- up.activeReadChunks[i] = 1
- }
- }
-}
-
-func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
- startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
- stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
- if stopOffset%up.ChunkSize > 0 {
- stopLogicChunkIndex += 1
- }
- up.activeReadChunksLock.Lock()
- defer up.activeReadChunksLock.Unlock()
- for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
- if count, found := up.activeReadChunks[i]; found {
- if count == 1 {
- delete(up.activeReadChunks, i)
- } else {
- up.activeReadChunks[i] = count - 1
- }
- }
- }
-}
-
-func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
- up.activeReadChunksLock.Lock()
- defer up.activeReadChunksLock.Unlock()
- if count, found := up.activeReadChunks[logicChunkIndex]; found {
- return count > 0
- }
- return false
-}
-
-func (up *UploadPipeline) waitForCurrentWritersToComplete() {
- up.uploaderCountCond.L.Lock()
- t := int32(100)
- for {
- t = atomic.LoadInt32(&up.uploaderCount)
- if t <= 0 {
- break
- }
- up.uploaderCountCond.Wait()
- }
- up.uploaderCountCond.L.Unlock()
-}
-
func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
if memChunk.IsComplete() {
up.moveToSealed(memChunk, logicChunkIndex)
diff --git a/weed/filesys/page_writer/upload_pipeline_lock.go b/weed/filesys/page_writer/upload_pipeline_lock.go
new file mode 100644
index 000000000..47a40ba37
--- /dev/null
+++ b/weed/filesys/page_writer/upload_pipeline_lock.go
@@ -0,0 +1,63 @@
+package page_writer
+
+import (
+ "sync/atomic"
+)
+
+func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
+ startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
+ stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
+ if stopOffset%up.ChunkSize > 0 {
+ stopLogicChunkIndex += 1
+ }
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
+ if count, found := up.activeReadChunks[i]; found {
+ up.activeReadChunks[i] = count + 1
+ } else {
+ up.activeReadChunks[i] = 1
+ }
+ }
+}
+
+func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
+ startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
+ stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
+ if stopOffset%up.ChunkSize > 0 {
+ stopLogicChunkIndex += 1
+ }
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
+ if count, found := up.activeReadChunks[i]; found {
+ if count == 1 {
+ delete(up.activeReadChunks, i)
+ } else {
+ up.activeReadChunks[i] = count - 1
+ }
+ }
+ }
+}
+
+func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ if count, found := up.activeReadChunks[logicChunkIndex]; found {
+ return count > 0
+ }
+ return false
+}
+
+func (up *UploadPipeline) waitForCurrentWritersToComplete() {
+ up.uploaderCountCond.L.Lock()
+ t := int32(100)
+ for {
+ t = atomic.LoadInt32(&up.uploaderCount)
+ if t <= 0 {
+ break
+ }
+ up.uploaderCountCond.Wait()
+ }
+ up.uploaderCountCond.L.Unlock()
+}