aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/page_writer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/page_writer')
-rw-r--r--weed/mount/page_writer/chunk_interval_list.go115
-rw-r--r--weed/mount/page_writer/chunk_interval_list_test.go49
-rw-r--r--weed/mount/page_writer/dirty_pages.go30
-rw-r--r--weed/mount/page_writer/page_chunk.go16
-rw-r--r--weed/mount/page_writer/page_chunk_mem.go69
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go121
-rw-r--r--weed/mount/page_writer/upload_pipeline.go182
-rw-r--r--weed/mount/page_writer/upload_pipeline_lock.go63
-rw-r--r--weed/mount/page_writer/upload_pipeline_test.go47
9 files changed, 692 insertions, 0 deletions
diff --git a/weed/mount/page_writer/chunk_interval_list.go b/weed/mount/page_writer/chunk_interval_list.go
new file mode 100644
index 000000000..e6dc5d1f5
--- /dev/null
+++ b/weed/mount/page_writer/chunk_interval_list.go
@@ -0,0 +1,115 @@
+package page_writer
+
+import "math"
+
+// ChunkWrittenInterval mark one written interval within one page chunk
+type ChunkWrittenInterval struct {
+ StartOffset int64
+ stopOffset int64
+ prev *ChunkWrittenInterval
+ next *ChunkWrittenInterval
+}
+
+func (interval *ChunkWrittenInterval) Size() int64 {
+ return interval.stopOffset - interval.StartOffset
+}
+
+func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
+ return interval.stopOffset-interval.StartOffset == chunkSize
+}
+
+// ChunkWrittenIntervalList mark written intervals within one page chunk
+type ChunkWrittenIntervalList struct {
+ head *ChunkWrittenInterval
+ tail *ChunkWrittenInterval
+}
+
+func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
+ list := &ChunkWrittenIntervalList{
+ head: &ChunkWrittenInterval{
+ StartOffset: -1,
+ stopOffset: -1,
+ },
+ tail: &ChunkWrittenInterval{
+ StartOffset: math.MaxInt64,
+ stopOffset: math.MaxInt64,
+ },
+ }
+ list.head.next = list.tail
+ list.tail.prev = list.head
+ return list
+}
+
+func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
+ interval := &ChunkWrittenInterval{
+ StartOffset: startOffset,
+ stopOffset: stopOffset,
+ }
+ list.addInterval(interval)
+}
+
+func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
+ return list.size() == 1 && list.head.next.isComplete(chunkSize)
+}
+func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
+ for t := list.head; t != nil; t = t.next {
+ writtenByteCount += t.Size()
+ }
+ return
+}
+
+func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
+
+ p := list.head
+ for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
+ }
+ q := list.tail
+ for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
+ }
+
+ if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
+ // merge p and q together
+ p.stopOffset = q.stopOffset
+ unlinkNodesBetween(p, q.next)
+ return
+ }
+ if interval.StartOffset <= p.stopOffset {
+ // merge new interval into p
+ p.stopOffset = interval.stopOffset
+ unlinkNodesBetween(p, q)
+ return
+ }
+ if q.StartOffset <= interval.stopOffset {
+ // merge new interval into q
+ q.StartOffset = interval.StartOffset
+ unlinkNodesBetween(p, q)
+ return
+ }
+
+ // add the new interval between p and q
+ unlinkNodesBetween(p, q)
+ p.next = interval
+ interval.prev = p
+ q.prev = interval
+ interval.next = q
+
+}
+
+// unlinkNodesBetween remove all nodes after start and before stop, exclusive
+func unlinkNodesBetween(start *ChunkWrittenInterval, stop *ChunkWrittenInterval) {
+ if start.next == stop {
+ return
+ }
+ start.next.prev = nil
+ start.next = stop
+ stop.prev.next = nil
+ stop.prev = start
+}
+
+func (list *ChunkWrittenIntervalList) size() int {
+ var count int
+ for t := list.head; t != nil; t = t.next {
+ count++
+ }
+ return count - 2
+}
diff --git a/weed/mount/page_writer/chunk_interval_list_test.go b/weed/mount/page_writer/chunk_interval_list_test.go
new file mode 100644
index 000000000..b22f5eb5d
--- /dev/null
+++ b/weed/mount/page_writer/chunk_interval_list_test.go
@@ -0,0 +1,49 @@
+package page_writer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_PageChunkWrittenIntervalList(t *testing.T) {
+ list := newChunkWrittenIntervalList()
+
+ assert.Equal(t, 0, list.size(), "empty list")
+
+ list.MarkWritten(0, 5)
+ assert.Equal(t, 1, list.size(), "one interval")
+
+ list.MarkWritten(0, 5)
+ assert.Equal(t, 1, list.size(), "duplicated interval2")
+
+ list.MarkWritten(95, 100)
+ assert.Equal(t, 2, list.size(), "two intervals")
+
+ list.MarkWritten(50, 60)
+ assert.Equal(t, 3, list.size(), "three intervals")
+
+ list.MarkWritten(50, 55)
+ assert.Equal(t, 3, list.size(), "three intervals merge")
+
+ list.MarkWritten(40, 50)
+ assert.Equal(t, 3, list.size(), "three intervals grow forward")
+
+ list.MarkWritten(50, 65)
+ assert.Equal(t, 3, list.size(), "three intervals grow backward")
+
+ list.MarkWritten(70, 80)
+ assert.Equal(t, 4, list.size(), "four intervals")
+
+ list.MarkWritten(60, 70)
+ assert.Equal(t, 3, list.size(), "three intervals merged")
+
+ list.MarkWritten(59, 71)
+ assert.Equal(t, 3, list.size(), "covered three intervals")
+
+ list.MarkWritten(5, 59)
+ assert.Equal(t, 2, list.size(), "covered two intervals")
+
+ list.MarkWritten(70, 99)
+ assert.Equal(t, 1, list.size(), "covered one intervals")
+
+}
diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go
new file mode 100644
index 000000000..25b747fad
--- /dev/null
+++ b/weed/mount/page_writer/dirty_pages.go
@@ -0,0 +1,30 @@
+package page_writer
+
+type DirtyPages interface {
+ AddPage(offset int64, data []byte)
+ FlushData() error
+ ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
+ GetStorageOptions() (collection, replication string)
+ Destroy()
+ LockForRead(startOffset, stopOffset int64)
+ UnlockForRead(startOffset, stopOffset int64)
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
+func minInt(x, y int) int {
+ if x < y {
+ return x
+ }
+ return y
+}
diff --git a/weed/mount/page_writer/page_chunk.go b/weed/mount/page_writer/page_chunk.go
new file mode 100644
index 000000000..4e8f31425
--- /dev/null
+++ b/weed/mount/page_writer/page_chunk.go
@@ -0,0 +1,16 @@
+package page_writer
+
+import (
+ "io"
+)
+
+type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
+
+type PageChunk interface {
+ FreeResource()
+ WriteDataAt(src []byte, offset int64) (n int)
+ ReadDataAt(p []byte, off int64) (maxStop int64)
+ IsComplete() bool
+ WrittenSize() int64
+ SaveContent(saveFn SaveToStorageFunc)
+}
diff --git a/weed/mount/page_writer/page_chunk_mem.go b/weed/mount/page_writer/page_chunk_mem.go
new file mode 100644
index 000000000..dfd54c19e
--- /dev/null
+++ b/weed/mount/page_writer/page_chunk_mem.go
@@ -0,0 +1,69 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+)
+
+var (
+ _ = PageChunk(&MemChunk{})
+)
+
+type MemChunk struct {
+ buf []byte
+ usage *ChunkWrittenIntervalList
+ chunkSize int64
+ logicChunkIndex LogicChunkIndex
+}
+
+func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
+ return &MemChunk{
+ logicChunkIndex: logicChunkIndex,
+ chunkSize: chunkSize,
+ buf: mem.Allocate(int(chunkSize)),
+ usage: newChunkWrittenIntervalList(),
+ }
+}
+
+func (mc *MemChunk) FreeResource() {
+ mem.Free(mc.buf)
+}
+
+func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
+ innerOffset := offset % mc.chunkSize
+ n = copy(mc.buf[innerOffset:], src)
+ mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
+ return
+}
+
+func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
+ for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
+ logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
+ logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
+ if logicStart < logicStop {
+ copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ return
+}
+
+func (mc *MemChunk) IsComplete() bool {
+ return mc.usage.IsComplete(mc.chunkSize)
+}
+
+func (mc *MemChunk) WrittenSize() int64 {
+ return mc.usage.WrittenSize()
+}
+
+func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
+ if saveFn == nil {
+ return
+ }
+ for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
+ reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
+ saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
+ })
+ }
+}
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
new file mode 100644
index 000000000..486557629
--- /dev/null
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -0,0 +1,121 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "os"
+)
+
+var (
+ _ = PageChunk(&SwapFileChunk{})
+)
+
+type ActualChunkIndex int
+
+type SwapFile struct {
+ dir string
+ file *os.File
+ logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
+ chunkSize int64
+}
+
+type SwapFileChunk struct {
+ swapfile *SwapFile
+ usage *ChunkWrittenIntervalList
+ logicChunkIndex LogicChunkIndex
+ actualChunkIndex ActualChunkIndex
+}
+
+func NewSwapFile(dir string, chunkSize int64) *SwapFile {
+ return &SwapFile{
+ dir: dir,
+ file: nil,
+ logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
+ chunkSize: chunkSize,
+ }
+}
+func (sf *SwapFile) FreeResource() {
+ if sf.file != nil {
+ sf.file.Close()
+ os.Remove(sf.file.Name())
+ }
+}
+
+func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
+ if sf.file == nil {
+ var err error
+ sf.file, err = os.CreateTemp(sf.dir, "")
+ if err != nil {
+ glog.Errorf("create swap file: %v", err)
+ return nil
+ }
+ }
+ actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
+ if !found {
+ actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
+ sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
+ }
+
+ return &SwapFileChunk{
+ swapfile: sf,
+ usage: newChunkWrittenIntervalList(),
+ logicChunkIndex: logicChunkIndex,
+ actualChunkIndex: actualChunkIndex,
+ }
+}
+
+func (sc *SwapFileChunk) FreeResource() {
+}
+
+func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
+ innerOffset := offset % sc.swapfile.chunkSize
+ var err error
+ n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
+ if err == nil {
+ sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
+ } else {
+ glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
+ }
+ return
+}
+
+func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
+ for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
+ logicStart := max(off, chunkStartOffset+t.StartOffset)
+ logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
+ if logicStart < logicStop {
+ actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
+ if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
+ glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ return
+}
+
+func (sc *SwapFileChunk) IsComplete() bool {
+ return sc.usage.IsComplete(sc.swapfile.chunkSize)
+}
+
+func (sc *SwapFileChunk) WrittenSize() int64 {
+ return sc.usage.WrittenSize()
+}
+
+func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
+ if saveFn == nil {
+ return
+ }
+ for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
+ data := mem.Allocate(int(t.Size()))
+ sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
+ reader := util.NewBytesReader(data)
+ saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
+ })
+ mem.Free(data)
+ }
+ sc.usage = newChunkWrittenIntervalList()
+}
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
new file mode 100644
index 000000000..53641e66d
--- /dev/null
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -0,0 +1,182 @@
+package page_writer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type LogicChunkIndex int
+
+type UploadPipeline struct {
+ filepath util.FullPath
+ ChunkSize int64
+ writableChunks map[LogicChunkIndex]PageChunk
+ writableChunksLock sync.Mutex
+ sealedChunks map[LogicChunkIndex]*SealedChunk
+ sealedChunksLock sync.Mutex
+ uploaders *util.LimitedConcurrentExecutor
+ uploaderCount int32
+ uploaderCountCond *sync.Cond
+ saveToStorageFn SaveToStorageFunc
+ activeReadChunks map[LogicChunkIndex]int
+ activeReadChunksLock sync.Mutex
+ bufferChunkLimit int
+}
+
+type SealedChunk struct {
+ chunk PageChunk
+ referenceCounter int // track uploading or reading processes
+}
+
+func (sc *SealedChunk) FreeReference(messageOnFree string) {
+ sc.referenceCounter--
+ if sc.referenceCounter == 0 {
+ glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
+ sc.chunk.FreeResource()
+ }
+}
+
+func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
+ return &UploadPipeline{
+ ChunkSize: chunkSize,
+ writableChunks: make(map[LogicChunkIndex]PageChunk),
+ sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
+ uploaders: writers,
+ uploaderCountCond: sync.NewCond(&sync.Mutex{}),
+ saveToStorageFn: saveToStorageFn,
+ activeReadChunks: make(map[LogicChunkIndex]int),
+ bufferChunkLimit: bufferChunkLimit,
+ }
+}
+
+func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
+ up.writableChunksLock.Lock()
+ defer up.writableChunksLock.Unlock()
+
+ logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
+
+ memChunk, found := up.writableChunks[logicChunkIndex]
+ if !found {
+ if len(up.writableChunks) < up.bufferChunkLimit {
+ memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ } else {
+ fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
+ for lci, mc := range up.writableChunks {
+ chunkFullness := mc.WrittenSize()
+ if fullness < chunkFullness {
+ fullestChunkIndex = lci
+ fullness = chunkFullness
+ }
+ }
+ up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
+ delete(up.writableChunks, fullestChunkIndex)
+ fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
+ memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ }
+ up.writableChunks[logicChunkIndex] = memChunk
+ }
+ n = memChunk.WriteDataAt(p, off)
+ up.maybeMoveToSealed(memChunk, logicChunkIndex)
+
+ return
+}
+
+func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
+ logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
+
+ // read from sealed chunks first
+ up.sealedChunksLock.Lock()
+ sealedChunk, found := up.sealedChunks[logicChunkIndex]
+ if found {
+ sealedChunk.referenceCounter++
+ }
+ up.sealedChunksLock.Unlock()
+ if found {
+ maxStop = sealedChunk.chunk.ReadDataAt(p, off)
+ glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
+ sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
+ }
+
+ // read from writable chunks last
+ up.writableChunksLock.Lock()
+ defer up.writableChunksLock.Unlock()
+ writableChunk, found := up.writableChunks[logicChunkIndex]
+ if !found {
+ return
+ }
+ writableMaxStop := writableChunk.ReadDataAt(p, off)
+ glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
+ maxStop = max(maxStop, writableMaxStop)
+
+ return
+}
+
+func (up *UploadPipeline) FlushAll() {
+ up.writableChunksLock.Lock()
+ defer up.writableChunksLock.Unlock()
+
+ for logicChunkIndex, memChunk := range up.writableChunks {
+ up.moveToSealed(memChunk, logicChunkIndex)
+ }
+
+ up.waitForCurrentWritersToComplete()
+}
+
+func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
+ if memChunk.IsComplete() {
+ up.moveToSealed(memChunk, logicChunkIndex)
+ }
+}
+
+func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
+ atomic.AddInt32(&up.uploaderCount, 1)
+ glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
+
+ up.sealedChunksLock.Lock()
+
+ if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
+ oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
+ }
+ sealedChunk := &SealedChunk{
+ chunk: memChunk,
+ referenceCounter: 1, // default 1 is for uploading process
+ }
+ up.sealedChunks[logicChunkIndex] = sealedChunk
+ delete(up.writableChunks, logicChunkIndex)
+
+ up.sealedChunksLock.Unlock()
+
+ up.uploaders.Execute(func() {
+ // first add to the file chunks
+ sealedChunk.chunk.SaveContent(up.saveToStorageFn)
+
+ // notify waiting process
+ atomic.AddInt32(&up.uploaderCount, -1)
+ glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
+ // Lock and Unlock are not required,
+ // but it may signal multiple times during one wakeup,
+ // and the waiting goroutine may miss some of them!
+ up.uploaderCountCond.L.Lock()
+ up.uploaderCountCond.Broadcast()
+ up.uploaderCountCond.L.Unlock()
+
+ // wait for readers
+ for up.IsLocked(logicChunkIndex) {
+ time.Sleep(59 * time.Millisecond)
+ }
+
+ // then remove from sealed chunks
+ up.sealedChunksLock.Lock()
+ defer up.sealedChunksLock.Unlock()
+ delete(up.sealedChunks, logicChunkIndex)
+ sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
+
+ })
+}
+
+func (up *UploadPipeline) Shutdown() {
+}
diff --git a/weed/mount/page_writer/upload_pipeline_lock.go b/weed/mount/page_writer/upload_pipeline_lock.go
new file mode 100644
index 000000000..47a40ba37
--- /dev/null
+++ b/weed/mount/page_writer/upload_pipeline_lock.go
@@ -0,0 +1,63 @@
+package page_writer
+
+import (
+ "sync/atomic"
+)
+
+func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
+ startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
+ stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
+ if stopOffset%up.ChunkSize > 0 {
+ stopLogicChunkIndex += 1
+ }
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
+ if count, found := up.activeReadChunks[i]; found {
+ up.activeReadChunks[i] = count + 1
+ } else {
+ up.activeReadChunks[i] = 1
+ }
+ }
+}
+
+func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
+ startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
+ stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
+ if stopOffset%up.ChunkSize > 0 {
+ stopLogicChunkIndex += 1
+ }
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
+ if count, found := up.activeReadChunks[i]; found {
+ if count == 1 {
+ delete(up.activeReadChunks, i)
+ } else {
+ up.activeReadChunks[i] = count - 1
+ }
+ }
+ }
+}
+
+func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
+ up.activeReadChunksLock.Lock()
+ defer up.activeReadChunksLock.Unlock()
+ if count, found := up.activeReadChunks[logicChunkIndex]; found {
+ return count > 0
+ }
+ return false
+}
+
+func (up *UploadPipeline) waitForCurrentWritersToComplete() {
+ up.uploaderCountCond.L.Lock()
+ t := int32(100)
+ for {
+ t = atomic.LoadInt32(&up.uploaderCount)
+ if t <= 0 {
+ break
+ }
+ up.uploaderCountCond.Wait()
+ }
+ up.uploaderCountCond.L.Unlock()
+}
diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go
new file mode 100644
index 000000000..816fb228b
--- /dev/null
+++ b/weed/mount/page_writer/upload_pipeline_test.go
@@ -0,0 +1,47 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "testing"
+)
+
+func TestUploadPipeline(t *testing.T) {
+
+ uploadPipeline := NewUploadPipeline(nil, 2*1024*1024, nil, 16)
+
+ writeRange(uploadPipeline, 0, 131072)
+ writeRange(uploadPipeline, 131072, 262144)
+ writeRange(uploadPipeline, 262144, 1025536)
+
+ confirmRange(t, uploadPipeline, 0, 1025536)
+
+ writeRange(uploadPipeline, 1025536, 1296896)
+
+ confirmRange(t, uploadPipeline, 1025536, 1296896)
+
+ writeRange(uploadPipeline, 1296896, 2162688)
+
+ confirmRange(t, uploadPipeline, 1296896, 2162688)
+
+ confirmRange(t, uploadPipeline, 1296896, 2162688)
+}
+
+// startOff and stopOff must be divided by 4
+func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
+ p := make([]byte, 4)
+ for i := startOff / 4; i < stopOff/4; i += 4 {
+ util.Uint32toBytes(p, uint32(i))
+ uploadPipeline.SaveDataAt(p, i)
+ }
+}
+
+func confirmRange(t *testing.T, uploadPipeline *UploadPipeline, startOff, stopOff int64) {
+ p := make([]byte, 4)
+ for i := startOff; i < stopOff/4; i += 4 {
+ uploadPipeline.MaybeReadDataAt(p, i)
+ x := util.BytesToUint32(p)
+ if x != uint32(i) {
+ t.Errorf("expecting %d found %d at offset [%d,%d)", i, x, i, i+4)
+ }
+ }
+}