aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/page_writer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/page_writer')
-rw-r--r--weed/filesys/page_writer/chunk_interval_list.go115
-rw-r--r--weed/filesys/page_writer/chunk_interval_list_test.go49
-rw-r--r--weed/filesys/page_writer/dirty_pages.go30
-rw-r--r--weed/filesys/page_writer/page_chunk.go16
-rw-r--r--weed/filesys/page_writer/page_chunk_mem.go69
-rw-r--r--weed/filesys/page_writer/page_chunk_swapfile.go121
-rw-r--r--weed/filesys/page_writer/upload_pipeline.go182
-rw-r--r--weed/filesys/page_writer/upload_pipeline_lock.go63
-rw-r--r--weed/filesys/page_writer/upload_pipeline_test.go47
9 files changed, 0 insertions, 692 deletions
diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go
deleted file mode 100644
index e6dc5d1f5..000000000
--- a/weed/filesys/page_writer/chunk_interval_list.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package page_writer
-
-import "math"
-
-// ChunkWrittenInterval mark one written interval within one page chunk
-type ChunkWrittenInterval struct {
- StartOffset int64
- stopOffset int64
- prev *ChunkWrittenInterval
- next *ChunkWrittenInterval
-}
-
-func (interval *ChunkWrittenInterval) Size() int64 {
- return interval.stopOffset - interval.StartOffset
-}
-
-func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
- return interval.stopOffset-interval.StartOffset == chunkSize
-}
-
-// ChunkWrittenIntervalList mark written intervals within one page chunk
-type ChunkWrittenIntervalList struct {
- head *ChunkWrittenInterval
- tail *ChunkWrittenInterval
-}
-
-func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
- list := &ChunkWrittenIntervalList{
- head: &ChunkWrittenInterval{
- StartOffset: -1,
- stopOffset: -1,
- },
- tail: &ChunkWrittenInterval{
- StartOffset: math.MaxInt64,
- stopOffset: math.MaxInt64,
- },
- }
- list.head.next = list.tail
- list.tail.prev = list.head
- return list
-}
-
-func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
- interval := &ChunkWrittenInterval{
- StartOffset: startOffset,
- stopOffset: stopOffset,
- }
- list.addInterval(interval)
-}
-
-func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
- return list.size() == 1 && list.head.next.isComplete(chunkSize)
-}
-func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
- for t := list.head; t != nil; t = t.next {
- writtenByteCount += t.Size()
- }
- return
-}
-
-func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
-
- p := list.head
- for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
- }
- q := list.tail
- for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
- }
-
- if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
- // merge p and q together
- p.stopOffset = q.stopOffset
- unlinkNodesBetween(p, q.next)
- return
- }
- if interval.StartOffset <= p.stopOffset {
- // merge new interval into p
- p.stopOffset = interval.stopOffset
- unlinkNodesBetween(p, q)
- return
- }
- if q.StartOffset <= interval.stopOffset {
- // merge new interval into q
- q.StartOffset = interval.StartOffset
- unlinkNodesBetween(p, q)
- return
- }
-
- // add the new interval between p and q
- unlinkNodesBetween(p, q)
- p.next = interval
- interval.prev = p
- q.prev = interval
- interval.next = q
-
-}
-
-// unlinkNodesBetween remove all nodes after start and before stop, exclusive
-func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
- if start.next == stop {
- return
- }
- start.next.prev = nil
- start.next = stop
- stop.prev.next = nil
- stop.prev = start
-}
-
-func (list *ChunkWrittenIntervalList) size() int {
- var count int
- for t := list.head; t != nil; t = t.next {
- count++
- }
- return count - 2
-}
diff --git a/weed/filesys/page_writer/chunk_interval_list_test.go b/weed/filesys/page_writer/chunk_interval_list_test.go
deleted file mode 100644
index b22f5eb5d..000000000
--- a/weed/filesys/page_writer/chunk_interval_list_test.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package page_writer
-
-import (
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func Test_PageChunkWrittenIntervalList(t *testing.T) {
- list := newChunkWrittenIntervalList()
-
- assert.Equal(t, 0, list.size(), "empty list")
-
- list.MarkWritten(0, 5)
- assert.Equal(t, 1, list.size(), "one interval")
-
- list.MarkWritten(0, 5)
- assert.Equal(t, 1, list.size(), "duplicated interval2")
-
- list.MarkWritten(95, 100)
- assert.Equal(t, 2, list.size(), "two intervals")
-
- list.MarkWritten(50, 60)
- assert.Equal(t, 3, list.size(), "three intervals")
-
- list.MarkWritten(50, 55)
- assert.Equal(t, 3, list.size(), "three intervals merge")
-
- list.MarkWritten(40, 50)
- assert.Equal(t, 3, list.size(), "three intervals grow forward")
-
- list.MarkWritten(50, 65)
- assert.Equal(t, 3, list.size(), "three intervals grow backward")
-
- list.MarkWritten(70, 80)
- assert.Equal(t, 4, list.size(), "four intervals")
-
- list.MarkWritten(60, 70)
- assert.Equal(t, 3, list.size(), "three intervals merged")
-
- list.MarkWritten(59, 71)
- assert.Equal(t, 3, list.size(), "covered three intervals")
-
- list.MarkWritten(5, 59)
- assert.Equal(t, 2, list.size(), "covered two intervals")
-
- list.MarkWritten(70, 99)
- assert.Equal(t, 1, list.size(), "covered one intervals")
-
-}
diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go
deleted file mode 100644
index 25b747fad..000000000
--- a/weed/filesys/page_writer/dirty_pages.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package page_writer
-
-type DirtyPages interface {
- AddPage(offset int64, data []byte)
- FlushData() error
- ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
- GetStorageOptions() (collection, replication string)
- Destroy()
- LockForRead(startOffset, stopOffset int64)
- UnlockForRead(startOffset, stopOffset int64)
-}
-
-func max(x, y int64) int64 {
- if x > y {
- return x
- }
- return y
-}
-func min(x, y int64) int64 {
- if x < y {
- return x
- }
- return y
-}
-func minInt(x, y int) int {
- if x < y {
- return x
- }
- return y
-}
diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go
deleted file mode 100644
index 4e8f31425..000000000
--- a/weed/filesys/page_writer/page_chunk.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package page_writer
-
-import (
- "io"
-)
-
-type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
-
-type PageChunk interface {
- FreeResource()
- WriteDataAt(src []byte, offset int64) (n int)
- ReadDataAt(p []byte, off int64) (maxStop int64)
- IsComplete() bool
- WrittenSize() int64
- SaveContent(saveFn SaveToStorageFunc)
-}
diff --git a/weed/filesys/page_writer/page_chunk_mem.go b/weed/filesys/page_writer/page_chunk_mem.go
deleted file mode 100644
index dfd54c19e..000000000
--- a/weed/filesys/page_writer/page_chunk_mem.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package page_writer
-
-import (
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/mem"
-)
-
-var (
- _ = PageChunk(&MemChunk{})
-)
-
-type MemChunk struct {
- buf []byte
- usage *ChunkWrittenIntervalList
- chunkSize int64
- logicChunkIndex LogicChunkIndex
-}
-
-func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
- return &MemChunk{
- logicChunkIndex: logicChunkIndex,
- chunkSize: chunkSize,
- buf: mem.Allocate(int(chunkSize)),
- usage: newChunkWrittenIntervalList(),
- }
-}
-
-func (mc *MemChunk) FreeResource() {
- mem.Free(mc.buf)
-}
-
-func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
- innerOffset := offset % mc.chunkSize
- n = copy(mc.buf[innerOffset:], src)
- mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
- return
-}
-
-func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
- memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
- for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
- logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
- logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
- if logicStart < logicStop {
- copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
- maxStop = max(maxStop, logicStop)
- }
- }
- return
-}
-
-func (mc *MemChunk) IsComplete() bool {
- return mc.usage.IsComplete(mc.chunkSize)
-}
-
-func (mc *MemChunk) WrittenSize() int64 {
- return mc.usage.WrittenSize()
-}
-
-func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
- if saveFn == nil {
- return
- }
- for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
- reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
- saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
- })
- }
-}
diff --git a/weed/filesys/page_writer/page_chunk_swapfile.go b/weed/filesys/page_writer/page_chunk_swapfile.go
deleted file mode 100644
index 486557629..000000000
--- a/weed/filesys/page_writer/page_chunk_swapfile.go
+++ /dev/null
@@ -1,121 +0,0 @@
-package page_writer
-
-import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/mem"
- "os"
-)
-
-var (
- _ = PageChunk(&SwapFileChunk{})
-)
-
-type ActualChunkIndex int
-
-type SwapFile struct {
- dir string
- file *os.File
- logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
- chunkSize int64
-}
-
-type SwapFileChunk struct {
- swapfile *SwapFile
- usage *ChunkWrittenIntervalList
- logicChunkIndex LogicChunkIndex
- actualChunkIndex ActualChunkIndex
-}
-
-func NewSwapFile(dir string, chunkSize int64) *SwapFile {
- return &SwapFile{
- dir: dir,
- file: nil,
- logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
- chunkSize: chunkSize,
- }
-}
-func (sf *SwapFile) FreeResource() {
- if sf.file != nil {
- sf.file.Close()
- os.Remove(sf.file.Name())
- }
-}
-
-func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
- if sf.file == nil {
- var err error
- sf.file, err = os.CreateTemp(sf.dir, "")
- if err != nil {
- glog.Errorf("create swap file: %v", err)
- return nil
- }
- }
- actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
- if !found {
- actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
- sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
- }
-
- return &SwapFileChunk{
- swapfile: sf,
- usage: newChunkWrittenIntervalList(),
- logicChunkIndex: logicChunkIndex,
- actualChunkIndex: actualChunkIndex,
- }
-}
-
-func (sc *SwapFileChunk) FreeResource() {
-}
-
-func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
- innerOffset := offset % sc.swapfile.chunkSize
- var err error
- n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
- if err == nil {
- sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
- } else {
- glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
- }
- return
-}
-
-func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
- chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
- for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
- logicStart := max(off, chunkStartOffset+t.StartOffset)
- logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
- if logicStart < logicStop {
- actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
- if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
- glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
- break
- }
- maxStop = max(maxStop, logicStop)
- }
- }
- return
-}
-
-func (sc *SwapFileChunk) IsComplete() bool {
- return sc.usage.IsComplete(sc.swapfile.chunkSize)
-}
-
-func (sc *SwapFileChunk) WrittenSize() int64 {
- return sc.usage.WrittenSize()
-}
-
-func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
- if saveFn == nil {
- return
- }
- for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
- data := mem.Allocate(int(t.Size()))
- sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
- reader := util.NewBytesReader(data)
- saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
- })
- mem.Free(data)
- }
- sc.usage = newChunkWrittenIntervalList()
-}
diff --git a/weed/filesys/page_writer/upload_pipeline.go b/weed/filesys/page_writer/upload_pipeline.go
deleted file mode 100644
index 53641e66d..000000000
--- a/weed/filesys/page_writer/upload_pipeline.go
+++ /dev/null
@@ -1,182 +0,0 @@
-package page_writer
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
- "sync"
- "sync/atomic"
- "time"
-)
-
-type LogicChunkIndex int
-
-type UploadPipeline struct {
- filepath util.FullPath
- ChunkSize int64
- writableChunks map[LogicChunkIndex]PageChunk
- writableChunksLock sync.Mutex
- sealedChunks map[LogicChunkIndex]*SealedChunk
- sealedChunksLock sync.Mutex
- uploaders *util.LimitedConcurrentExecutor
- uploaderCount int32
- uploaderCountCond *sync.Cond
- saveToStorageFn SaveToStorageFunc
- activeReadChunks map[LogicChunkIndex]int
- activeReadChunksLock sync.Mutex
- bufferChunkLimit int
-}
-
-type SealedChunk struct {
- chunk PageChunk
- referenceCounter int // track uploading or reading processes
-}
-
-func (sc *SealedChunk) FreeReference(messageOnFree string) {
- sc.referenceCounter--
- if sc.referenceCounter == 0 {
- glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
- sc.chunk.FreeResource()
- }
-}
-
-func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
- return &UploadPipeline{
- ChunkSize: chunkSize,
- writableChunks: make(map[LogicChunkIndex]PageChunk),
- sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
- uploaders: writers,
- uploaderCountCond: sync.NewCond(&sync.Mutex{}),
- saveToStorageFn: saveToStorageFn,
- activeReadChunks: make(map[LogicChunkIndex]int),
- bufferChunkLimit: bufferChunkLimit,
- }
-}
-
-func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
- up.writableChunksLock.Lock()
- defer up.writableChunksLock.Unlock()
-
- logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
-
- memChunk, found := up.writableChunks[logicChunkIndex]
- if !found {
- if len(up.writableChunks) < up.bufferChunkLimit {
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
- } else {
- fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
- for lci, mc := range up.writableChunks {
- chunkFullness := mc.WrittenSize()
- if fullness < chunkFullness {
- fullestChunkIndex = lci
- fullness = chunkFullness
- }
- }
- up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
- delete(up.writableChunks, fullestChunkIndex)
- fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
- memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
- }
- up.writableChunks[logicChunkIndex] = memChunk
- }
- n = memChunk.WriteDataAt(p, off)
- up.maybeMoveToSealed(memChunk, logicChunkIndex)
-
- return
-}
-
-func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
- logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
-
- // read from sealed chunks first
- up.sealedChunksLock.Lock()
- sealedChunk, found := up.sealedChunks[logicChunkIndex]
- if found {
- sealedChunk.referenceCounter++
- }
- up.sealedChunksLock.Unlock()
- if found {
- maxStop = sealedChunk.chunk.ReadDataAt(p, off)
- glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
- sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
- }
-
- // read from writable chunks last
- up.writableChunksLock.Lock()
- defer up.writableChunksLock.Unlock()
- writableChunk, found := up.writableChunks[logicChunkIndex]
- if !found {
- return
- }
- writableMaxStop := writableChunk.ReadDataAt(p, off)
- glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
- maxStop = max(maxStop, writableMaxStop)
-
- return
-}
-
-func (up *UploadPipeline) FlushAll() {
- up.writableChunksLock.Lock()
- defer up.writableChunksLock.Unlock()
-
- for logicChunkIndex, memChunk := range up.writableChunks {
- up.moveToSealed(memChunk, logicChunkIndex)
- }
-
- up.waitForCurrentWritersToComplete()
-}
-
-func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
- if memChunk.IsComplete() {
- up.moveToSealed(memChunk, logicChunkIndex)
- }
-}
-
-func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
- atomic.AddInt32(&up.uploaderCount, 1)
- glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
-
- up.sealedChunksLock.Lock()
-
- if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
- oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
- }
- sealedChunk := &SealedChunk{
- chunk: memChunk,
- referenceCounter: 1, // default 1 is for uploading process
- }
- up.sealedChunks[logicChunkIndex] = sealedChunk
- delete(up.writableChunks, logicChunkIndex)
-
- up.sealedChunksLock.Unlock()
-
- up.uploaders.Execute(func() {
- // first add to the file chunks
- sealedChunk.chunk.SaveContent(up.saveToStorageFn)
-
- // notify waiting process
- atomic.AddInt32(&up.uploaderCount, -1)
- glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
- // Lock and Unlock are not required,
- // but it may signal multiple times during one wakeup,
- // and the waiting goroutine may miss some of them!
- up.uploaderCountCond.L.Lock()
- up.uploaderCountCond.Broadcast()
- up.uploaderCountCond.L.Unlock()
-
- // wait for readers
- for up.IsLocked(logicChunkIndex) {
- time.Sleep(59 * time.Millisecond)
- }
-
- // then remove from sealed chunks
- up.sealedChunksLock.Lock()
- defer up.sealedChunksLock.Unlock()
- delete(up.sealedChunks, logicChunkIndex)
- sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
-
- })
-}
-
-func (up *UploadPipeline) Shutdown() {
-}
diff --git a/weed/filesys/page_writer/upload_pipeline_lock.go b/weed/filesys/page_writer/upload_pipeline_lock.go
deleted file mode 100644
index 47a40ba37..000000000
--- a/weed/filesys/page_writer/upload_pipeline_lock.go
+++ /dev/null
@@ -1,63 +0,0 @@
-package page_writer
-
-import (
- "sync/atomic"
-)
-
-func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
- startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
- stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
- if stopOffset%up.ChunkSize > 0 {
- stopLogicChunkIndex += 1
- }
- up.activeReadChunksLock.Lock()
- defer up.activeReadChunksLock.Unlock()
- for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
- if count, found := up.activeReadChunks[i]; found {
- up.activeReadChunks[i] = count + 1
- } else {
- up.activeReadChunks[i] = 1
- }
- }
-}
-
-func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
- startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
- stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
- if stopOffset%up.ChunkSize > 0 {
- stopLogicChunkIndex += 1
- }
- up.activeReadChunksLock.Lock()
- defer up.activeReadChunksLock.Unlock()
- for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
- if count, found := up.activeReadChunks[i]; found {
- if count == 1 {
- delete(up.activeReadChunks, i)
- } else {
- up.activeReadChunks[i] = count - 1
- }
- }
- }
-}
-
-func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
- up.activeReadChunksLock.Lock()
- defer up.activeReadChunksLock.Unlock()
- if count, found := up.activeReadChunks[logicChunkIndex]; found {
- return count > 0
- }
- return false
-}
-
-func (up *UploadPipeline) waitForCurrentWritersToComplete() {
- up.uploaderCountCond.L.Lock()
- t := int32(100)
- for {
- t = atomic.LoadInt32(&up.uploaderCount)
- if t <= 0 {
- break
- }
- up.uploaderCountCond.Wait()
- }
- up.uploaderCountCond.L.Unlock()
-}
diff --git a/weed/filesys/page_writer/upload_pipeline_test.go b/weed/filesys/page_writer/upload_pipeline_test.go
deleted file mode 100644
index 816fb228b..000000000
--- a/weed/filesys/page_writer/upload_pipeline_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package page_writer
-
-import (
- "github.com/chrislusf/seaweedfs/weed/util"
- "testing"
-)
-
-func TestUploadPipeline(t *testing.T) {
-
- uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
-
- writeRange(uploadPipeline, 0, 131072)
- writeRange(uploadPipeline, 131072, 262144)
- writeRange(uploadPipeline, 262144, 1025536)
-
- confirmRange(t, uploadPipeline, 0, 1025536)
-
- writeRange(uploadPipeline, 1025536, 1296896)
-
- confirmRange(t, uploadPipeline, 1025536, 1296896)
-
- writeRange(uploadPipeline, 1296896, 2162688)
-
- confirmRange(t, uploadPipeline, 1296896, 2162688)
-
- confirmRange(t, uploadPipeline, 1296896, 2162688)
-}
-
-// startOff and stopOff must be divided by 4
-func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
- p := make([]byte, 4)
- for i := startOff / 4; i < stopOff/4; i += 4 {
- util.Uint32toBytes(p, uint32(i))
- uploadPipeline.SaveDataAt(p, i)
- }
-}
-
-func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
- p := make([]byte, 4)
- for i := startOff; i < stopOff/4; i += 4 {
- uploadPipeline.MaybeReadDataAt(p, i)
- x := util.BytesToUint32(p)
- if x != uint32(i) {
- t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
- }
- }
-}