aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/page_writer/upload_pipeline.go26
1 files changed, 13 insertions, 13 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 2286cdf00..e084ca58f 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -24,7 +24,7 @@ type UploadPipeline struct {
saveToStorageFn SaveToStorageFunc
activeReadChunks map[LogicChunkIndex]int
activeReadChunksLock sync.Mutex
- bufferChunkLimit int
+ writableChunkLimit int
swapFile *SwapFile
}
@@ -43,15 +43,15 @@ func (sc *SealedChunk) FreeReference(messageOnFree string) {
func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int, swapFileDir string) *UploadPipeline {
return &UploadPipeline{
- ChunkSize: chunkSize,
- writableChunks: make(map[LogicChunkIndex]PageChunk),
- sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
- uploaders: writers,
- uploaderCountCond: sync.NewCond(&sync.Mutex{}),
- saveToStorageFn: saveToStorageFn,
- activeReadChunks: make(map[LogicChunkIndex]int),
- bufferChunkLimit: bufferChunkLimit,
- swapFile: NewSwapFile(swapFileDir, chunkSize),
+ ChunkSize: chunkSize,
+ writableChunks: make(map[LogicChunkIndex]PageChunk),
+ sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
+ uploaders: writers,
+ uploaderCountCond: sync.NewCond(&sync.Mutex{}),
+ saveToStorageFn: saveToStorageFn,
+ activeReadChunks: make(map[LogicChunkIndex]int),
+ writableChunkLimit: bufferChunkLimit,
+ swapFile: NewSwapFile(swapFileDir, chunkSize),
}
}
@@ -63,7 +63,7 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
pageChunk, found := up.writableChunks[logicChunkIndex]
if !found {
- if len(up.writableChunks) > up.bufferChunkLimit {
+ if len(up.writableChunks) > up.writableChunkLimit {
// if current file chunks is over the per file buffer count limit
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
@@ -78,8 +78,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
}
if isSequential &&
- len(up.writableChunks) < up.bufferChunkLimit &&
- atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) {
+ len(up.writableChunks) < up.writableChunkLimit &&
+ atomic.LoadInt64(&memChunkCounter) < 4*int64(up.writableChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
} else {
pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)