diff options
Diffstat (limited to 'weed/mount/page_writer/upload_pipeline.go')
| -rw-r--r-- | weed/mount/page_writer/upload_pipeline.go | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go new file mode 100644 index 000000000..53641e66d --- /dev/null +++ b/weed/mount/page_writer/upload_pipeline.go @@ -0,0 +1,182 @@ +package page_writer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" + "sync/atomic" + "time" +) + +type LogicChunkIndex int + +type UploadPipeline struct { + filepath util.FullPath + ChunkSize int64 + writableChunks map[LogicChunkIndex]PageChunk + writableChunksLock sync.Mutex + sealedChunks map[LogicChunkIndex]*SealedChunk + sealedChunksLock sync.Mutex + uploaders *util.LimitedConcurrentExecutor + uploaderCount int32 + uploaderCountCond *sync.Cond + saveToStorageFn SaveToStorageFunc + activeReadChunks map[LogicChunkIndex]int + activeReadChunksLock sync.Mutex + bufferChunkLimit int +} + +type SealedChunk struct { + chunk PageChunk + referenceCounter int // track uploading or reading processes +} + +func (sc *SealedChunk) FreeReference(messageOnFree string) { + sc.referenceCounter-- + if sc.referenceCounter == 0 { + glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) + sc.chunk.FreeResource() + } +} + +func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { + return &UploadPipeline{ + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]PageChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + uploaders: writers, + uploaderCountCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + activeReadChunks: make(map[LogicChunkIndex]int), + bufferChunkLimit: bufferChunkLimit, + } +} + +func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) + + memChunk, found := up.writableChunks[logicChunkIndex] + if !found { + if len(up.writableChunks) < up.bufferChunkLimit { + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } else { + fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) + for lci, mc := range up.writableChunks { + chunkFullness := mc.WrittenSize() + if fullness < chunkFullness { + fullestChunkIndex = lci + fullness = chunkFullness + } + } + up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) + delete(up.writableChunks, fullestChunkIndex) + fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness) + memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) + } + up.writableChunks[logicChunkIndex] = memChunk + } + n = memChunk.WriteDataAt(p, off) + up.maybeMoveToSealed(memChunk, logicChunkIndex) + + return +} + +func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { + logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) + + // read from sealed chunks first + up.sealedChunksLock.Lock() + sealedChunk, found := up.sealedChunks[logicChunkIndex] + if found { + sealedChunk.referenceCounter++ + } + up.sealedChunksLock.Unlock() + if found { + maxStop = sealedChunk.chunk.ReadDataAt(p, off) + glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) + sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) + } + + // read from writable chunks last + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + writableChunk, found := up.writableChunks[logicChunkIndex] + if !found { + return + } + writableMaxStop := writableChunk.ReadDataAt(p, off) + glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) + maxStop = max(maxStop, writableMaxStop) + + return +} + +func (up *UploadPipeline) FlushAll() { + up.writableChunksLock.Lock() + defer up.writableChunksLock.Unlock() + + for logicChunkIndex, memChunk := range up.writableChunks { + up.moveToSealed(memChunk, logicChunkIndex) + } + + up.waitForCurrentWritersToComplete() +} + +func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + if memChunk.IsComplete() { + up.moveToSealed(memChunk, logicChunkIndex) + } +} + +func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { + atomic.AddInt32(&up.uploaderCount, 1) + glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) + + up.sealedChunksLock.Lock() + + if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { + oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) + } + sealedChunk := &SealedChunk{ + chunk: memChunk, + referenceCounter: 1, // default 1 is for uploading process + } + up.sealedChunks[logicChunkIndex] = sealedChunk + delete(up.writableChunks, logicChunkIndex) + + up.sealedChunksLock.Unlock() + + up.uploaders.Execute(func() { + // first add to the file chunks + sealedChunk.chunk.SaveContent(up.saveToStorageFn) + + // notify waiting process + atomic.AddInt32(&up.uploaderCount, -1) + glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) + // Lock and Unlock are not required, + // but it may signal multiple times during one wakeup, + // and the waiting goroutine may miss some of them! + up.uploaderCountCond.L.Lock() + up.uploaderCountCond.Broadcast() + up.uploaderCountCond.L.Unlock() + + // wait for readers + for up.IsLocked(logicChunkIndex) { + time.Sleep(59 * time.Millisecond) + } + + // then remove from sealed chunks + up.sealedChunksLock.Lock() + defer up.sealedChunksLock.Unlock() + delete(up.sealedChunks, logicChunkIndex) + sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) + + }) +} + +func (up *UploadPipeline) Shutdown() { +} |
