diff options
Diffstat (limited to 'weed/filesys/page_writer/upload_pipeline.go')
| -rw-r--r-- | weed/filesys/page_writer/upload_pipeline.go | 165 |
1 files changed, 87 insertions, 78 deletions
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go index 40722924f..ee85cf6c8 100644 --- a/weed/filesys/page_writer/upload_pipeline.go +++ b/weed/filesys/page_writer/upload_pipeline.go @@ -24,6 +24,7 @@ type UploadPipeline struct { saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex + swapFile *SwapFile } type SealedChunk struct { @@ -39,7 +40,7 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { } } -func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc) *UploadPipeline { +func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, swapFileDir string) *UploadPipeline { return &UploadPipeline{ ChunkSize: chunkSize, writableChunks: make(map[LogicChunkIndex]PageChunk), @@ -49,177 +50,185 @@ func NewUploadPipeline(filepath util.FullPath, writers *util.LimitedConcurrentEx saveToStorageFn: saveToStorageFn, filepath: filepath, activeReadChunks: make(map[LogicChunkIndex]int), + swapFile: NewSwapFile(swapFileDir, chunkSize), } } -func (cw *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { - cw.writableChunksLock.Lock() - defer cw.writableChunksLock.Unlock() +func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - memChunk, found := cw.writableChunks[logicChunkIndex] + memChunk, found := up.writableChunks[logicChunkIndex] if !found { - memChunk = NewMemChunk(logicChunkIndex, cw.ChunkSize) - cw.writableChunks[logicChunkIndex] = memChunk + if len(up.writableChunks) < 0 { + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } else { + memChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) + if memChunk == nil { + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } + } + up.writableChunks[logicChunkIndex] = memChunk } n = memChunk.WriteDataAt(p, off) - cw.maybeMoveToSealed(memChunk, logicChunkIndex) + up.maybeMoveToSealed(memChunk, logicChunkIndex) return } -func (cw *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) // read from sealed chunks first - cw.sealedChunksLock.Lock() - sealedChunk, found := cw.sealedChunks[logicChunkIndex] + up.sealedChunksLock.Lock() + sealedChunk, found := up.sealedChunks[logicChunkIndex] if found { sealedChunk.referenceCounter++ } - cw.sealedChunksLock.Unlock() + up.sealedChunksLock.Unlock() if found { maxStop = sealedChunk.chunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read sealed memchunk [%d,%d)", cw.filepath, off, maxStop) - sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", cw.filepath, logicChunkIndex)) + glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) } // read from writable chunks last - cw.writableChunksLock.Lock() - defer cw.writableChunksLock.Unlock() - writableChunk, found := cw.writableChunks[logicChunkIndex] + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + writableChunk, found := up.writableChunks[logicChunkIndex] if !found { return } writableMaxStop := writableChunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read writable memchunk [%d,%d)", cw.filepath, off, writableMaxStop) + glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) maxStop = max(maxStop, writableMaxStop) return } -func (cw *UploadPipeline) FlushAll() { - cw.writableChunksLock.Lock() - defer cw.writableChunksLock.Unlock() +func (up *UploadPipeline) FlushAll() { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() - for logicChunkIndex, memChunk := range cw.writableChunks { - cw.moveToSealed(memChunk, logicChunkIndex) + for logicChunkIndex, memChunk := range up.writableChunks { + up.moveToSealed(memChunk, logicChunkIndex) } - cw.waitForCurrentWritersToComplete() + up.waitForCurrentWritersToComplete() } -func (cw *UploadPipeline) LockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) - if stopOffset%cw.ChunkSize > 0 { +func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } - cw.activeReadChunksLock.Lock() - defer cw.activeReadChunksLock.Unlock() + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := cw.activeReadChunks[i]; found { - cw.activeReadChunks[i] = count + 1 + if count, found := up.activeReadChunks[i]; found { + up.activeReadChunks[i] = count + 1 } else { - cw.activeReadChunks[i] = 1 + up.activeReadChunks[i] = 1 } } } -func (cw *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / cw.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / cw.ChunkSize) - if stopOffset%cw.ChunkSize > 0 { +func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { + startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) + stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) + if stopOffset%up.ChunkSize > 0 { stopLogicChunkIndex += 1 } - cw.activeReadChunksLock.Lock() - defer cw.activeReadChunksLock.Unlock() + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := cw.activeReadChunks[i]; found { + if count, found := up.activeReadChunks[i]; found { if count == 1 { - delete(cw.activeReadChunks, i) + delete(up.activeReadChunks, i) } else { - cw.activeReadChunks[i] = count - 1 + up.activeReadChunks[i] = count - 1 } } } } -func (cw *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { - cw.activeReadChunksLock.Lock() - defer cw.activeReadChunksLock.Unlock() - if count, found := cw.activeReadChunks[logicChunkIndex]; found { +func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { + up.activeReadChunksLock.Lock() + defer up.activeReadChunksLock.Unlock() + if count, found := up.activeReadChunks[logicChunkIndex]; found { return count > 0 } return false } -func (cw *UploadPipeline) waitForCurrentWritersToComplete() { - cw.uploaderCountCond.L.Lock() +func (up *UploadPipeline) waitForCurrentWritersToComplete() { + up.uploaderCountCond.L.Lock() t := int32(100) for { - t = atomic.LoadInt32(&cw.uploaderCount) + t = atomic.LoadInt32(&up.uploaderCount) if t <= 0 { break } - cw.uploaderCountCond.Wait() + up.uploaderCountCond.Wait() } - cw.uploaderCountCond.L.Unlock() + up.uploaderCountCond.L.Unlock() } -func (cw *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { +func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { if memChunk.IsComplete() { - cw.moveToSealed(memChunk, logicChunkIndex) + up.moveToSealed(memChunk, logicChunkIndex) } } -func (cw *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { - atomic.AddInt32(&cw.uploaderCount, 1) - glog.V(4).Infof("%s uploaderCount %d ++> %d", cw.filepath, cw.uploaderCount-1, cw.uploaderCount) +func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + atomic.AddInt32(&up.uploaderCount, 1) + glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) - cw.sealedChunksLock.Lock() + up.sealedChunksLock.Lock() - if oldMemChunk, found := cw.sealedChunks[logicChunkIndex]; found { - oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", cw.filepath, logicChunkIndex)) + if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) } sealedChunk := &SealedChunk{ chunk: memChunk, referenceCounter: 1, // default 1 is for uploading process } - cw.sealedChunks[logicChunkIndex] = sealedChunk - delete(cw.writableChunks, logicChunkIndex) + up.sealedChunks[logicChunkIndex] = sealedChunk + delete(up.writableChunks, logicChunkIndex) - cw.sealedChunksLock.Unlock() + up.sealedChunksLock.Unlock() - cw.uploaders.Execute(func() { + up.uploaders.Execute(func() { // first add to the file chunks - sealedChunk.chunk.SaveContent(cw.saveToStorageFn) + sealedChunk.chunk.SaveContent(up.saveToStorageFn) // notify waiting process - atomic.AddInt32(&cw.uploaderCount, -1) - glog.V(4).Infof("%s uploaderCount %d --> %d", cw.filepath, cw.uploaderCount+1, cw.uploaderCount) + atomic.AddInt32(&up.uploaderCount, -1) + glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) // Lock and Unlock are not required, // but it may signal multiple times during one wakeup, // and the waiting goroutine may miss some of them! - cw.uploaderCountCond.L.Lock() - cw.uploaderCountCond.Broadcast() - cw.uploaderCountCond.L.Unlock() + up.uploaderCountCond.L.Lock() + up.uploaderCountCond.Broadcast() + up.uploaderCountCond.L.Unlock() // wait for readers - for cw.IsLocked(logicChunkIndex) { + for up.IsLocked(logicChunkIndex) { time.Sleep(59 * time.Millisecond) } // then remove from sealed chunks - cw.sealedChunksLock.Lock() - defer cw.sealedChunksLock.Unlock() - delete(cw.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", cw.filepath, logicChunkIndex)) + up.sealedChunksLock.Lock() + defer up.sealedChunksLock.Unlock() + delete(up.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) }) } -func (p2 *UploadPipeline) Shutdown() { - +func (up *UploadPipeline) Shutdown() { + up.swapFile.FreeResource() } |
