aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-01-22 01:46:10 -0800
committerchrislu <chris.lu@gmail.com>2022-01-22 01:46:10 -0800
commit3b4a9addaf2fbee5aefbc99a2482c04a119c07e7 (patch)
treedcf7d66c0312a230d922eee77574e3071c2f6ee5
parent4acfc098e9d9e31dbfae9c6b846315b6011cbfdd (diff)
downloadseaweedfs-3b4a9addaf2fbee5aefbc99a2482c04a119c07e7.tar.xz
seaweedfs-3b4a9addaf2fbee5aefbc99a2482c04a119c07e7.zip
rename
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go47
1 files changed, 23 insertions, 24 deletions
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
index 47664ed55..05300ef1c 100644
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ b/weed/filesys/page_writer/upload_pipeline.go
@@ -19,9 +19,9 @@ type UploadPipeline struct {
writableChunksLock sync.Mutex
sealedChunks map[LogicChunkIndex]*SealedChunk
sealedChunksLock sync.Mutex
- writers *util.LimitedConcurrentExecutor
- activeWriterCond *sync.Cond
- activeWriterCount int32
+ uploaders *util.LimitedConcurrentExecutor
+ uploaderCount int32
+ uploaderCountCond *sync.Cond
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
@@ -42,14 +42,14 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline {
return &UploadPipeline{
- ChunkSize: chunkSize,
- writableChunks: make(map[LogicChunkIndex]*MemChunk),
- sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
- writers: writers,
- activeWriterCond: sync.NewCond(&sync.Mutex{}),
- saveToStorageFn: saveToStorageFn,
- filepath: filepath,
- activeReadChunks: make(map[LogicChunkIndex]int),
+ ChunkSize: chunkSize,
+ writableChunks: make(map[LogicChunkIndex]*MemChunk),
+ sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
+ uploaders: writers,
+ uploaderCountCond: sync.NewCond(&sync.Mutex{}),
+ saveToStorageFn: saveToStorageFn,
+ filepath: filepath,
+ activeReadChunks: make(map[LogicChunkIndex]int),
}
}
@@ -162,17 +162,16 @@ func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
}
func (cw *UploadPipeline) waitForCurrentWritersToComplete() {
- cw.activeWriterCond.L.Lock()
+ cw.uploaderCountCond.L.Lock()
t := int32(100)
for {
- t = atomic.LoadInt32(&cw.activeWriterCount)
+ t = atomic.LoadInt32(&cw.uploaderCount)
if t <= 0 {
break
}
- glog.V(4).Infof("activeWriterCond is %d", t)
- cw.activeWriterCond.Wait()
+ cw.uploaderCountCond.Wait()
}
- cw.activeWriterCond.L.Unlock()
+ cw.uploaderCountCond.L.Unlock()
}
func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
@@ -182,8 +181,8 @@ func (cw *UploadPipeline) maybeMoveToSealed(memChunk *MemChunk, logicChunkIndex
}
func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
- atomic.AddInt32(&cw.activeWriterCount, 1)
- glog.V(4).Infof("%s activeWriterCount %d ++> %d", cw.filepath, cw.activeWriterCount-1, cw.activeWriterCount)
+ atomic.AddInt32(&cw.uploaderCount, 1)
+ glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount)
cw.sealedChunksLock.Lock()
@@ -199,19 +198,19 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
cw.sealedChunksLock.Unlock()
- cw.writers.Execute(func() {
+ cw.uploaders.Execute(func() {
// first add to the file chunks
cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex)
// notify waiting process
- atomic.AddInt32(&cw.activeWriterCount, -1)
- glog.V(4).Infof("%s activeWriterCount %d --> %d", cw.filepath, cw.activeWriterCount+1, cw.activeWriterCount)
+ atomic.AddInt32(&cw.uploaderCount, -1)
+ glog.V(4).Infof("%s uploaderCount %d --> %d", cw.filepath, cw.uploaderCount+1, cw.uploaderCount)
// Lock and Unlock are not required,
// but it may signal multiple times during one wakeup,
// and the waiting goroutine may miss some of them!
- cw.activeWriterCond.L.Lock()
- cw.activeWriterCond.Broadcast()
- cw.activeWriterCond.L.Unlock()
+ cw.uploaderCountCond.L.Lock()
+ cw.uploaderCountCond.Broadcast()
+ cw.uploaderCountCond.L.Unlock()
// wait for readers
for cw.IsLocked(logicChunkIndex) {