diff options
Diffstat (limited to 'weed/filesys/page_writer')
| -rw-r--r-- | weed/filesys/page_writer/chunk_interval_list.go | 115 | ||||
| -rw-r--r-- | weed/filesys/page_writer/chunk_interval_list_test.go | 49 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_pages.go | 30 | ||||
| -rw-r--r-- | weed/filesys/page_writer/page_chunk.go | 16 | ||||
| -rw-r--r-- | weed/filesys/page_writer/page_chunk_mem.go | 69 | ||||
| -rw-r--r-- | weed/filesys/page_writer/page_chunk_swapfile.go | 121 | ||||
| -rw-r--r-- | weed/filesys/page_writer/upload_pipeline.go | 182 | ||||
| -rw-r--r-- | weed/filesys/page_writer/upload_pipeline_lock.go | 63 | ||||
| -rw-r--r-- | weed/filesys/page_writer/upload_pipeline_test.go | 47 |
9 files changed, 0 insertions, 692 deletions
diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go deleted file mode 100644 index e6dc5d1f5..000000000 --- a/weed/filesys/page_writer/chunk_interval_list.go +++ /dev/null @@ -1,115 +0,0 @@ -package page_writer - -import "math" - -// ChunkWrittenInterval mark one written interval within one page chunk -type ChunkWrittenInterval struct { - StartOffset int64 - stopOffset int64 - prev *ChunkWrittenInterval - next *ChunkWrittenInterval -} - -func (interval *ChunkWrittenInterval) Size() int64 { - return interval.stopOffset - interval.StartOffset -} - -func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool { - return interval.stopOffset-interval.StartOffset == chunkSize -} - -// ChunkWrittenIntervalList mark written intervals within one page chunk -type ChunkWrittenIntervalList struct { - head *ChunkWrittenInterval - tail *ChunkWrittenInterval -} - -func newChunkWrittenIntervalList() *ChunkWrittenIntervalList { - list := &ChunkWrittenIntervalList{ - head: &ChunkWrittenInterval{ - StartOffset: -1, - stopOffset: -1, - }, - tail: &ChunkWrittenInterval{ - StartOffset: math.MaxInt64, - stopOffset: math.MaxInt64, - }, - } - list.head.next = list.tail - list.tail.prev = list.head - return list -} - -func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { - interval := &ChunkWrittenInterval{ - StartOffset: startOffset, - stopOffset: stopOffset, - } - list.addInterval(interval) -} - -func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool { - return list.size() == 1 && list.head.next.isComplete(chunkSize) -} -func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) { - for t := list.head; t != nil; t = t.next { - writtenByteCount += t.Size() - } - return -} - -func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) { - - p := list.head - for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next { - } - q := list.tail - for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { - } - - if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset { - // merge p and q together - p.stopOffset = q.stopOffset - unlinkNodesBetween(p, q.next) - return - } - if interval.StartOffset <= p.stopOffset { - // merge new interval into p - p.stopOffset = interval.stopOffset - unlinkNodesBetween(p, q) - return - } - if q.StartOffset <= interval.stopOffset { - // merge new interval into q - q.StartOffset = interval.StartOffset - unlinkNodesBetween(p, q) - return - } - - // add the new interval between p and q - unlinkNodesBetween(p, q) - p.next = interval - interval.prev = p - q.prev = interval - interval.next = q - -} - -// unlinkNodesBetween remove all nodes after start and before stop, exclusive -func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) { - if start.next == stop { - return - } - start.next.prev = nil - start.next = stop - stop.prev.next = nil - stop.prev = start -} - -func (list *ChunkWrittenIntervalList) size() int { - var count int - for t := list.head; t != nil; t = t.next { - count++ - } - return count - 2 -} diff --git a/weed/filesys/page_writer/chunk_interval_list_test.go b/weed/filesys/page_writer/chunk_interval_list_test.go deleted file mode 100644 index b22f5eb5d..000000000 --- a/weed/filesys/page_writer/chunk_interval_list_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package page_writer - -import ( - "github.com/stretchr/testify/assert" - "testing" -) - -func Test_PageChunkWrittenIntervalList(t *testing.T) { - list := newChunkWrittenIntervalList() - - assert.Equal(t, 0, list.size(), "empty list") - - list.MarkWritten(0, 5) - assert.Equal(t, 1, list.size(), "one interval") - - list.MarkWritten(0, 5) - assert.Equal(t, 1, list.size(), "duplicated interval2") - - list.MarkWritten(95, 100) - assert.Equal(t, 2, list.size(), "two intervals") - - list.MarkWritten(50, 60) - assert.Equal(t, 3, list.size(), "three intervals") - - list.MarkWritten(50, 55) - assert.Equal(t, 3, list.size(), "three intervals merge") - - list.MarkWritten(40, 50) - assert.Equal(t, 3, list.size(), "three intervals grow forward") - - list.MarkWritten(50, 65) - assert.Equal(t, 3, list.size(), "three intervals grow backward") - - list.MarkWritten(70, 80) - assert.Equal(t, 4, list.size(), "four intervals") - - list.MarkWritten(60, 70) - assert.Equal(t, 3, list.size(), "three intervals merged") - - list.MarkWritten(59, 71) - assert.Equal(t, 3, list.size(), "covered three intervals") - - list.MarkWritten(5, 59) - assert.Equal(t, 2, list.size(), "covered two intervals") - - list.MarkWritten(70, 99) - assert.Equal(t, 1, list.size(), "covered one intervals") - -} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go deleted file mode 100644 index 25b747fad..000000000 --- a/weed/filesys/page_writer/dirty_pages.go +++ /dev/null @@ -1,30 +0,0 @@ -package page_writer - -type DirtyPages interface { - AddPage(offset int64, data []byte) - FlushData() error - ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) - GetStorageOptions() (collection, replication string) - Destroy() - LockForRead(startOffset, stopOffset int64) - UnlockForRead(startOffset, stopOffset int64) -} - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} -func minInt(x, y int) int { - if x < y { - return x - } - return y -} diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go deleted file mode 100644 index 4e8f31425..000000000 --- a/weed/filesys/page_writer/page_chunk.go +++ /dev/null @@ -1,16 +0,0 @@ -package page_writer - -import ( - "io" -) - -type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) - -type PageChunk interface { - FreeResource() - WriteDataAt(src []byte, offset int64) (n int) - ReadDataAt(p []byte, off int64) (maxStop int64) - IsComplete() bool - WrittenSize() int64 - SaveContent(saveFn SaveToStorageFunc) -} diff --git a/weed/filesys/page_writer/page_chunk_mem.go b/weed/filesys/page_writer/page_chunk_mem.go deleted file mode 100644 index dfd54c19e..000000000 --- a/weed/filesys/page_writer/page_chunk_mem.go +++ /dev/null @@ -1,69 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" -) - -var ( - _ = PageChunk(&MemChunk{}) -) - -type MemChunk struct { - buf []byte - usage *ChunkWrittenIntervalList - chunkSize int64 - logicChunkIndex LogicChunkIndex -} - -func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk { - return &MemChunk{ - logicChunkIndex: logicChunkIndex, - chunkSize: chunkSize, - buf: mem.Allocate(int(chunkSize)), - usage: newChunkWrittenIntervalList(), - } -} - -func (mc *MemChunk) FreeResource() { - mem.Free(mc.buf) -} - -func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) { - innerOffset := offset % mc.chunkSize - n = copy(mc.buf[innerOffset:], src) - mc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) - return -} - -func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { - memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize - for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { - logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (mc *MemChunk) IsComplete() bool { - return mc.usage.IsComplete(mc.chunkSize) -} - -func (mc *MemChunk) WrittenSize() int64 { - return mc.usage.WrittenSize() -} - -func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) { - if saveFn == nil { - return - } - for t := mc.usage.head.next; t != mc.usage.tail; t = t.next { - reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset]) - saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() { - }) - } -} diff --git a/weed/filesys/page_writer/page_chunk_swapfile.go b/weed/filesys/page_writer/page_chunk_swapfile.go deleted file mode 100644 index 486557629..000000000 --- a/weed/filesys/page_writer/page_chunk_swapfile.go +++ /dev/null @@ -1,121 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" - "os" -) - -var ( - _ = PageChunk(&SwapFileChunk{}) -) - -type ActualChunkIndex int - -type SwapFile struct { - dir string - file *os.File - logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex - chunkSize int64 -} - -type SwapFileChunk struct { - swapfile *SwapFile - usage *ChunkWrittenIntervalList - logicChunkIndex LogicChunkIndex - actualChunkIndex ActualChunkIndex -} - -func NewSwapFile(dir string, chunkSize int64) *SwapFile { - return &SwapFile{ - dir: dir, - file: nil, - logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex), - chunkSize: chunkSize, - } -} -func (sf *SwapFile) FreeResource() { - if sf.file != nil { - sf.file.Close() - os.Remove(sf.file.Name()) - } -} - -func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) { - if sf.file == nil { - var err error - sf.file, err = os.CreateTemp(sf.dir, "") - if err != nil { - glog.Errorf("create swap file: %v", err) - return nil - } - } - actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex] - if !found { - actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex)) - sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex - } - - return &SwapFileChunk{ - swapfile: sf, - usage: newChunkWrittenIntervalList(), - logicChunkIndex: logicChunkIndex, - actualChunkIndex: actualChunkIndex, - } -} - -func (sc *SwapFileChunk) FreeResource() { -} - -func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) { - innerOffset := offset % sc.swapfile.chunkSize - var err error - n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset) - if err == nil { - sc.usage.MarkWritten(innerOffset, innerOffset+int64(n)) - } else { - glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err) - } - return -} - -func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) { - chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize - for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { - logicStart := max(off, chunkStartOffset+t.StartOffset) - logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset) - if logicStart < logicStop { - actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize - if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil { - glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err) - break - } - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (sc *SwapFileChunk) IsComplete() bool { - return sc.usage.IsComplete(sc.swapfile.chunkSize) -} - -func (sc *SwapFileChunk) WrittenSize() int64 { - return sc.usage.WrittenSize() -} - -func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) { - if saveFn == nil { - return - } - for t := sc.usage.head.next; t != sc.usage.tail; t = t.next { - data := mem.Allocate(int(t.Size())) - sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize) - reader := util.NewBytesReader(data) - saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() { - }) - mem.Free(data) - } - sc.usage = newChunkWrittenIntervalList() -} diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go deleted file mode 100644 index 53641e66d..000000000 --- a/weed/filesys/page_writer/upload_pipeline.go +++ /dev/null @@ -1,182 +0,0 @@ -package page_writer - -import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - "sync" - "sync/atomic" - "time" -) - -type LogicChunkIndex int - -type UploadPipeline struct { - filepath util.FullPath - ChunkSize int64 - writableChunks map[LogicChunkIndex]PageChunk - writableChunksLock sync.Mutex - sealedChunks map[LogicChunkIndex]*SealedChunk - sealedChunksLock sync.Mutex - uploaders *util.LimitedConcurrentExecutor - uploaderCount int32 - uploaderCountCond *sync.Cond - saveToStorageFn SaveToStorageFunc - activeReadChunks map[LogicChunkIndex]int - activeReadChunksLock sync.Mutex - bufferChunkLimit int -} - -type SealedChunk struct { - chunk PageChunk - referenceCounter int // track uploading or reading processes -} - -func (sc *SealedChunk) FreeReference(messageOnFree string) { - sc.referenceCounter-- - if sc.referenceCounter == 0 { - glog.V(4).Infof("Free sealed chunk: %s", messageOnFree) - sc.chunk.FreeResource() - } -} - -func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline { - return &UploadPipeline{ - ChunkSize: chunkSize, - writableChunks: make(map[LogicChunkIndex]PageChunk), - sealedChunks: make(map[LogicChunkIndex]*SealedChunk), - uploaders: writers, - uploaderCountCond: sync.NewCond(&sync.Mutex{}), - saveToStorageFn: saveToStorageFn, - activeReadChunks: make(map[LogicChunkIndex]int), - bufferChunkLimit: bufferChunkLimit, - } -} - -func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) { - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() - - logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - - memChunk, found := up.writableChunks[logicChunkIndex] - if !found { - if len(up.writableChunks) < up.bufferChunkLimit { - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) - } else { - fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0) - for lci, mc := range up.writableChunks { - chunkFullness := mc.WrittenSize() - if fullness < chunkFullness { - fullestChunkIndex = lci - fullness = chunkFullness - } - } - up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex) - delete(up.writableChunks, fullestChunkIndex) - fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness) - memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize) - } - up.writableChunks[logicChunkIndex] = memChunk - } - n = memChunk.WriteDataAt(p, off) - up.maybeMoveToSealed(memChunk, logicChunkIndex) - - return -} - -func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) { - logicChunkIndex := LogicChunkIndex(off / up.ChunkSize) - - // read from sealed chunks first - up.sealedChunksLock.Lock() - sealedChunk, found := up.sealedChunks[logicChunkIndex] - if found { - sealedChunk.referenceCounter++ - } - up.sealedChunksLock.Unlock() - if found { - maxStop = sealedChunk.chunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop) - sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex)) - } - - // read from writable chunks last - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() - writableChunk, found := up.writableChunks[logicChunkIndex] - if !found { - return - } - writableMaxStop := writableChunk.ReadDataAt(p, off) - glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop) - maxStop = max(maxStop, writableMaxStop) - - return -} - -func (up *UploadPipeline) FlushAll() { - up.writableChunksLock.Lock() - defer up.writableChunksLock.Unlock() - - for logicChunkIndex, memChunk := range up.writableChunks { - up.moveToSealed(memChunk, logicChunkIndex) - } - - up.waitForCurrentWritersToComplete() -} - -func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { - if memChunk.IsComplete() { - up.moveToSealed(memChunk, logicChunkIndex) - } -} - -func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) { - atomic.AddInt32(&up.uploaderCount, 1) - glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount) - - up.sealedChunksLock.Lock() - - if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found { - oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex)) - } - sealedChunk := &SealedChunk{ - chunk: memChunk, - referenceCounter: 1, // default 1 is for uploading process - } - up.sealedChunks[logicChunkIndex] = sealedChunk - delete(up.writableChunks, logicChunkIndex) - - up.sealedChunksLock.Unlock() - - up.uploaders.Execute(func() { - // first add to the file chunks - sealedChunk.chunk.SaveContent(up.saveToStorageFn) - - // notify waiting process - atomic.AddInt32(&up.uploaderCount, -1) - glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount) - // Lock and Unlock are not required, - // but it may signal multiple times during one wakeup, - // and the waiting goroutine may miss some of them! - up.uploaderCountCond.L.Lock() - up.uploaderCountCond.Broadcast() - up.uploaderCountCond.L.Unlock() - - // wait for readers - for up.IsLocked(logicChunkIndex) { - time.Sleep(59 * time.Millisecond) - } - - // then remove from sealed chunks - up.sealedChunksLock.Lock() - defer up.sealedChunksLock.Unlock() - delete(up.sealedChunks, logicChunkIndex) - sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex)) - - }) -} - -func (up *UploadPipeline) Shutdown() { -} diff --git a/weed/filesys/page_writer/upload_pipeline_lock.go b/weed/filesys/page_writer/upload_pipeline_lock.go deleted file mode 100644 index 47a40ba37..000000000 --- a/weed/filesys/page_writer/upload_pipeline_lock.go +++ /dev/null @@ -1,63 +0,0 @@ -package page_writer - -import ( - "sync/atomic" -) - -func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) - if stopOffset%up.ChunkSize > 0 { - stopLogicChunkIndex += 1 - } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := up.activeReadChunks[i]; found { - up.activeReadChunks[i] = count + 1 - } else { - up.activeReadChunks[i] = 1 - } - } -} - -func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) { - startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize) - stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize) - if stopOffset%up.ChunkSize > 0 { - stopLogicChunkIndex += 1 - } - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ { - if count, found := up.activeReadChunks[i]; found { - if count == 1 { - delete(up.activeReadChunks, i) - } else { - up.activeReadChunks[i] = count - 1 - } - } - } -} - -func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool { - up.activeReadChunksLock.Lock() - defer up.activeReadChunksLock.Unlock() - if count, found := up.activeReadChunks[logicChunkIndex]; found { - return count > 0 - } - return false -} - -func (up *UploadPipeline) waitForCurrentWritersToComplete() { - up.uploaderCountCond.L.Lock() - t := int32(100) - for { - t = atomic.LoadInt32(&up.uploaderCount) - if t <= 0 { - break - } - up.uploaderCountCond.Wait() - } - up.uploaderCountCond.L.Unlock() -} diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go deleted file mode 100644 index 816fb228b..000000000 --- a/weed/filesys/page_writer/upload_pipeline_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package page_writer - -import ( - "github.com/chrislusf/seaweedfs/weed/util" - "testing" -) - -func TestUploadPipeline(t *testing.T) { - - uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16) - - writeRange(uploadPipeline, 0, 131072) - writeRange(uploadPipeline, 131072, 262144) - writeRange(uploadPipeline, 262144, 1025536) - - confirmRange(t, uploadPipeline, 0, 1025536) - - writeRange(uploadPipeline, 1025536, 1296896) - - confirmRange(t, uploadPipeline, 1025536, 1296896) - - writeRange(uploadPipeline, 1296896, 2162688) - - confirmRange(t, uploadPipeline, 1296896, 2162688) - - confirmRange(t, uploadPipeline, 1296896, 2162688) -} - -// startOff and stopOff must be divided by 4 -func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) { - p := make([]byte, 4) - for i := startOff / 4; i < stopOff/4; i += 4 { - util.Uint32toBytes(p, uint32(i)) - uploadPipeline.SaveDataAt(p, i) - } -} - -func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) { - p := make([]byte, 4) - for i := startOff; i < stopOff/4; i += 4 { - uploadPipeline.MaybeReadDataAt(p, i) - x := util.BytesToUint32(p) - if x != uint32(i) { - t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4) - } - } -} |
