diff options
| -rw-r--r-- | weed/mount/page_writer/upload_pipeline.go | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go index f54a87a02..bce062a6e 100644 --- a/weed/mount/page_writer/upload_pipeline.go +++ b/weed/mount/page_writer/upload_pipeline.go @@ -73,8 +73,9 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n fullness = chunkFullness } } - up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) + fullWritableChunk := up.writableChunks[fullestChunkIndex] delete(up.writableChunks, fullestChunkIndex) + up.moveToSealed(fullWritableChunk, fullestChunkIndex) // fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness) } if isSequential && @@ -155,6 +156,8 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic up.sealedChunks[logicChunkIndex] = sealedChunk delete(up.writableChunks, logicChunkIndex) + // unlock before submitting the uploading jobs + up.chunksLock.Unlock() up.uploaders.Execute(func() { // first add to the file chunks sealedChunk.chunk.SaveContent(up.saveToStorageFn) @@ -181,6 +184,7 @@ func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex Logic sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) }) + up.chunksLock.Lock() } func (up *UploadPipeline) Shutdown() { |
