aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go6
-rw-r--r--weed/filesys/page_writer/upload_pipeline_test.go47
2 files changed, 52 insertions, 1 deletions
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
index 0c9e13649..b87061adb 100644
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ b/weed/filesys/page_writer/upload_pipeline.go
@@ -142,9 +142,10 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
cw.sealedChunksLock.Unlock()
cw.writers.Execute(func() {
+ // first add to the file chunks
cw.saveOneChunk(sealedChunk.chunk, logicChunkIndex)
- // remove from sealed chunks
+ // then remove from sealed chunks
sealedChunk.FreeReference()
cw.sealedChunksLock.Lock()
defer cw.sealedChunksLock.Unlock()
@@ -162,6 +163,9 @@ func (cw *UploadPipeline) moveToSealed(memChunk *MemChunk, logicChunkIndex Logic
}
func (cw *UploadPipeline) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
+ if cw.saveToStorageFn == nil {
+ return
+ }
for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go
new file mode 100644
index 000000000..81191868f
--- /dev/null
+++ b/weed/filesys/page_writer/upload_pipeline_test.go
@@ -0,0 +1,47 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "testing"
+)
+
+func TestUploadPipeline(t *testing.T) {
+
+ uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil)
+
+ writeRange(uploadPipeline, 0, 131072)
+ writeRange(uploadPipeline, 131072, 262144)
+ writeRange(uploadPipeline, 262144, 1025536)
+
+ confirmRange(t, uploadPipeline, 0, 1025536)
+
+ writeRange(uploadPipeline, 1025536, 1296896)
+
+ confirmRange(t, uploadPipeline, 1025536, 1296896)
+
+ writeRange(uploadPipeline, 1296896, 2162688)
+
+ confirmRange(t, uploadPipeline, 1296896, 2162688)
+
+ confirmRange(t, uploadPipeline, 1296896, 2162688)
+}
+
+// startOff and stopOff must be divided by 4
+func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
+ p := make([]byte, 4)
+ for i := startOff / 4; i < stopOff/4; i += 4 {
+ util.Uint32toBytes(p, uint32(i))
+ uploadPipeline.SaveDataAt(p, i)
+ }
+}
+
+func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
+ p := make([]byte, 4)
+ for i := startOff; i < stopOff/4; i += 4 {
+ uploadPipeline.MaybeReadDataAt(p, i)
+ x := util.BytesToUint32(p)
+ if x != uint32(i) {
+ t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
+ }
+ }
+}