aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/page_writer/upload_pipeline.go6
1 files changed, 5 insertions, 1 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index f54a87a02..bce062a6e 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -73,8 +73,9 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n
fullness = chunkFullness
}
}
- up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
+ fullWritableChunk := up.writableChunks[fullestChunkIndex]
delete(up.writableChunks, fullestChunkIndex)
+ up.moveToSealed(fullWritableChunk, fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
}
if isSequential &&
@@ -155,6 +156,8 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
up.sealedChunks[logicChunkIndex] = sealedChunk
delete(up.writableChunks, logicChunkIndex)
+ // unlock before submitting the uploading jobs
+ up.chunksLock.Unlock()
up.uploaders.Execute(func() {
// first add to the file chunks
sealedChunk.chunk.SaveContent(up.saveToStorageFn)
@@ -181,6 +184,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic
sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
})
+ up.chunksLock.Lock()
}
func (up *UploadPipeline) Shutdown() {