diff options
| author | chrislu <chris.lu@gmail.com> | 2022-03-13 18:34:57 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-03-13 18:34:57 -0700 |
| commit | 5cba8e51c5bb8925b3676b03a368f7816215cc68 (patch) | |
| tree | 680f4990d544aafbc262c5bcdb9f9d1f7cd8bd26 | |
| parent | f2f68f675efc4e394b5f610e977d47d692819968 (diff) | |
| download | seaweedfs-5cba8e51c5bb8925b3676b03a368f7816215cc68.tar.xz seaweedfs-5cba8e51c5bb8925b3676b03a368f7816215cc68.zip | |
refactor
| -rw-r--r-- | weed/mount/page_writer/upload_pipeline.go | 26 |
1 files changed, 13 insertions, 13 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index 2286cdf00..e084ca58f 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -24,7 +24,7 @@ type UploadPipeline struct { saveToStorageFn SaveToStorageFunc activeReadChunks map[LogicChunkIndex]int activeReadChunksLock sync.Mutex - bufferChunkLimit int + writableChunkLimit int swapFile *SwapFile } @@ -43,15 +43,15 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) { func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline { return &UploadPipeline{ - ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]PageChunk), - sealedChunks: make(map[LogicChunkIndex]*SealedChunk), - uploaders: writers, - uploaderCountCond: sync.NewCond(&sync.Mutex{}), - saveToStorageFn: saveToStorageFn, - activeReadChunks: make(map[LogicChunkIndex]int), - bufferChunkLimit: bufferChunkLimit, - swapFile: NewSwapFile(swapFileDir, chunkSize), + ChunkSize: chunkSize, + writableChunks: make(map[LogicChunkIndex]PageChunk), + sealedChunks: make(map[LogicChunkIndex]*SealedChunk), + uploaders: writers, + uploaderCountCond: sync.NewCond(&sync.Mutex{}), + saveToStorageFn: saveToStorageFn, + activeReadChunks: make(map[LogicChunkIndex]int), + writableChunkLimit: bufferChunkLimit, + swapFile: NewSwapFile(swapFileDir, chunkSize), } } @@ -63,7 +63,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n pageChunk, found := up.writableChunks[logicChunkIndex] if !found { - if len(up.writableChunks) > up.bufferChunkLimit { + if len(up.writableChunks) > up.writableChunkLimit { // if current file chunks is over the per file buffer count limit fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) for lci, mc := range up.writableChunks { @@ -78,8 +78,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } if isSequential && - len(up.writableChunks) < up.bufferChunkLimit && - atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) { + len(up.writableChunks) < up.writableChunkLimit && + atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) { pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) } else { pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex) |
