aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filesys/dirty_pages_continuous.go2
-rw-r--r--weed/filesys/dirty_pages_temp_file.go96
-rw-r--r--weed/filesys/filehandle.go1
-rw-r--r--weed/filesys/page_writer.go28
-rw-r--r--weed/filesys/page_writer/chunked_file_writer.go152
-rw-r--r--weed/filesys/page_writer/chunked_file_writer_test.go60
-rw-r--r--weed/filesys/page_writer/dirty_pages.go20
-rw-r--r--weed/filesys/page_writer/dirty_pages_temp_interval.go302
-rw-r--r--weed/filesys/page_writer/page_chunk_interval_list.go4
-rw-r--r--weed/filesys/page_writer_pattern.go9
10 files changed, 294 insertions, 380 deletions
diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go
index 88b50ce41..19401b94e 100644
--- a/weed/filesys/dirty_pages_continuous.go
+++ b/weed/filesys/dirty_pages_continuous.go
@@ -147,3 +147,5 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6
func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
+func (pages ContinuousDirtyPages) Destroy() {
+}
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index 6a22889dc..cc6f4dc29 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -12,22 +12,21 @@ import (
)
type TempFileDirtyPages struct {
- f *File
- tf *os.File
- writtenIntervals *page_writer.WrittenContinuousIntervals
- writeWaitGroup sync.WaitGroup
- pageAddLock sync.Mutex
- chunkAddLock sync.Mutex
- lastErr error
- collection string
- replication string
+ f *File
+ writeWaitGroup sync.WaitGroup
+ pageAddLock sync.Mutex
+ chunkAddLock sync.Mutex
+ lastErr error
+ collection string
+ replication string
+ chunkedFile *page_writer.ChunkedFileWriter
}
-func newTempFileDirtyPages(file *File) *TempFileDirtyPages {
+func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{
- f: file,
- writtenIntervals: &page_writer.WrittenContinuousIntervals{},
+ f: file,
+ chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize),
}
return tempFile
@@ -38,84 +37,46 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
- if pages.tf == nil {
- tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "")
- if err != nil {
- glog.Errorf("create temp file: %v", err)
- pages.lastErr = err
- return
- }
- pages.tf = tf
- pages.writtenIntervals.TempFile = tf
- pages.writtenIntervals.LastOffset = 0
- }
-
- writtenOffset := pages.writtenIntervals.LastOffset
- dataSize := int64(len(data))
-
- // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
-
- if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
+ if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil {
pages.lastErr = err
- } else {
- pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
- pages.writtenIntervals.LastOffset += dataSize
}
- // pages.writtenIntervals.debug()
-
return
}
func (pages *TempFileDirtyPages) FlushData() error {
-
- pages.saveExistingPagesToStorage()
+ pages.saveChunkedFileToStorage()
pages.writeWaitGroup.Wait()
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
- pages.pageAddLock.Lock()
- defer pages.pageAddLock.Unlock()
- if pages.tf != nil {
-
- pages.writtenIntervals.TempFile = nil
- pages.writtenIntervals.Lists = nil
-
- pages.tf.Close()
- os.Remove(pages.tf.Name())
- pages.tf = nil
- }
return nil
}
-func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
+func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ return pages.chunkedFile.ReadDataAt(data, startOffset)
+}
- pageSize := pages.f.wfs.option.ChunkSizeLimit
+func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
+ return pages.collection, pages.replication
+}
- // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
+func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
- for _, list := range pages.writtenIntervals.Lists {
- listStopOffset := list.Offset() + list.Size()
- for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
- start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
- if start >= stop {
- continue
- }
- // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
- pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
- }
- }
+ pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex int, interval *page_writer.PageChunkWrittenInterval) {
+ reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
+ pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize, interval.Size())
+ })
}
-func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
+func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) {
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
writer := func() {
defer pages.writeWaitGroup.Done()
- reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
@@ -135,12 +96,9 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s
} else {
go writer()
}
-}
-func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
- return pages.writtenIntervals.ReadDataAt(data, startOffset)
}
-func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
- return pages.collection, pages.replication
+func (pages TempFileDirtyPages) Destroy() {
+ pages.chunkedFile.Destroy()
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index d92b17b5b..a551e6e10 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -222,6 +222,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.reader = nil
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ fh.dirtyPages.Destroy()
}
if fh.f.isOpen < 0 {
diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go
index 9c9e38968..8e52f9f67 100644
--- a/weed/filesys/page_writer.go
+++ b/weed/filesys/page_writer.go
@@ -24,7 +24,7 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter {
pw := &PageWriter{
f: file,
chunkSize: chunkSize,
- randomWriter: newTempFileDirtyPages(file),
+ randomWriter: newTempFileDirtyPages(file, chunkSize),
streamWriter: newContinuousDirtyPages(file),
writerPattern: NewWriterPattern(file.Name, chunkSize),
}
@@ -63,11 +63,23 @@ func (pw *PageWriter) FlushData() error {
return pw.randomWriter.FlushData()
}
-func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
- glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data)))
- m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset)
- m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset)
- return max(m1, m2)
+func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) {
+ glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data)))
+
+ chunkIndex := offset / pw.chunkSize
+ for i := chunkIndex; len(data) > 0; i++ {
+ readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
+
+ m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset)
+ m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
+
+ maxStop = max(maxStop, max(m1, m2))
+
+ offset += readSize
+ data = data[readSize:]
+ }
+
+ return
}
func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
@@ -76,3 +88,7 @@ func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
}
return pw.randomWriter.GetStorageOptions()
}
+
+func (pw *PageWriter) Destroy() {
+ pw.randomWriter.Destroy()
+}
diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go
new file mode 100644
index 000000000..180a2039d
--- /dev/null
+++ b/weed/filesys/page_writer/chunked_file_writer.go
@@ -0,0 +1,152 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "io"
+ "os"
+ "sync"
+)
+
+// ChunkedFileWriter assumes the write requests will come in within chunks
+type ChunkedFileWriter struct {
+ dir string
+ file *os.File
+ logicToActualChunkIndex map[int]int
+ chunkUsages []*PageChunkWrittenIntervalList
+ ChunkSize int64
+ sync.Mutex
+}
+
+var _ = io.WriterAt(&ChunkedFileWriter{})
+
+func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
+ return &ChunkedFileWriter{
+ dir: dir,
+ file: nil,
+ logicToActualChunkIndex: make(map[int]int),
+ ChunkSize: chunkSize,
+ }
+}
+
+func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
+ cw.Lock()
+ defer cw.Unlock()
+
+ if cw.file == nil {
+ cw.file, err = os.CreateTemp(cw.dir, "")
+ if err != nil {
+ glog.Errorf("create temp file: %v", err)
+ return
+ }
+ }
+
+ actualOffset, chunkUsage := cw.toActualWriteOffset(off)
+ n, err = cw.file.WriteAt(p, actualOffset)
+ if err == nil {
+ startOffset := off % cw.ChunkSize
+ chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
+ }
+ return
+}
+
+func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ cw.Lock()
+ defer cw.Unlock()
+
+ if cw.file == nil {
+ return
+ }
+
+ logicChunkIndex := off / cw.ChunkSize
+ actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
+ if chunkUsage != nil {
+ for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
+ logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset)
+ logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
+ if logicStart < logicStop {
+ actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
+ _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
+ if err != nil {
+ glog.Errorf("reading temp file: %v", err)
+ break
+ }
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ }
+ return
+}
+
+func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *PageChunkWrittenIntervalList) {
+ logicChunkIndex := int(logicOffset / cw.ChunkSize)
+ offsetRemainder := logicOffset % cw.ChunkSize
+ existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
+ if found {
+ return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
+ }
+ cw.logicToActualChunkIndex[logicChunkIndex] = len(cw.chunkUsages)
+ chunkUsage = newPageChunkWrittenIntervalList()
+ cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
+ return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
+}
+
+func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex int, chunkUsage *PageChunkWrittenIntervalList) {
+ logicChunkIndex := int(logicOffset / cw.ChunkSize)
+ existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
+ if found {
+ return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
+ }
+ return 0, nil
+}
+
+func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex int, interval *PageChunkWrittenInterval)) {
+ for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
+ chunkUsage := cw.chunkUsages[actualChunkIndex]
+ for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
+ process(cw.file, logicChunkIndex, t)
+ }
+ }
+}
+func (cw *ChunkedFileWriter) Destroy() {
+ if cw.file != nil {
+ cw.file.Close()
+ os.Remove(cw.file.Name())
+ }
+}
+
+type FileIntervalReader struct {
+ f *os.File
+ startOffset int64
+ stopOffset int64
+ position int64
+}
+
+var _ = io.Reader(&FileIntervalReader{})
+
+func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex int, interval *PageChunkWrittenInterval) *FileIntervalReader {
+ actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
+ if !found {
+ // this should never happen
+ return nil
+ }
+ return &FileIntervalReader{
+ f: cw.file,
+ startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset,
+ stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
+ position: 0,
+ }
+}
+
+func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
+ readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
+ n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
+ if err == nil || err == io.EOF {
+ fr.position += int64(n)
+ if fr.stopOffset-fr.startOffset-fr.position == 0 {
+ // return a tiny bit faster
+ err = io.EOF
+ return
+ }
+ }
+ return
+}
diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go
new file mode 100644
index 000000000..1c72c77d4
--- /dev/null
+++ b/weed/filesys/page_writer/chunked_file_writer_test.go
@@ -0,0 +1,60 @@
+package page_writer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestChunkedFileWriter_toActualOffset(t *testing.T) {
+ cw := NewChunkedFileWriter("", 16)
+
+ writeToFile(cw, 50, 60)
+ writeToFile(cw, 60, 64)
+
+ writeToFile(cw, 32, 40)
+ writeToFile(cw, 42, 48)
+
+ writeToFile(cw, 48, 50)
+
+ assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered")
+ assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals")
+
+}
+
+func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
+
+ _, chunkUsage := cw.toActualWriteOffset(startOffset)
+
+ // skip doing actual writing
+
+ innerOffset := startOffset % cw.ChunkSize
+ chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset)
+
+}
+
+func TestWriteChunkedFile(t *testing.T) {
+ x := NewChunkedFileWriter(os.TempDir(), 20)
+ defer x.Destroy()
+ y := NewChunkedFileWriter(os.TempDir(), 12)
+ defer y.Destroy()
+
+ batchSize := 4
+ buf := make([]byte, batchSize)
+ for i := 0; i < 256; i++ {
+ for x := 0; x < batchSize; x++ {
+ buf[x] = byte(i)
+ }
+ x.WriteAt(buf, int64(i*batchSize))
+ y.WriteAt(buf, int64((255-i)*batchSize))
+ }
+
+ a := make([]byte, 1)
+ b := make([]byte, 1)
+ for i := 0; i < 256*batchSize; i++ {
+ x.ReadDataAt(a, int64(i))
+ y.ReadDataAt(b, int64(256*batchSize-1-i))
+ assert.Equal(t, a[0], b[0], "same read")
+ }
+
+}
diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go
index c18f847b7..955627d67 100644
--- a/weed/filesys/page_writer/dirty_pages.go
+++ b/weed/filesys/page_writer/dirty_pages.go
@@ -5,4 +5,24 @@ type DirtyPages interface {
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
+ Destroy()
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
+func minInt(x, y int) int {
+ if x < y {
+ return x
+ }
+ return y
}
diff --git a/weed/filesys/page_writer/dirty_pages_temp_interval.go b/weed/filesys/page_writer/dirty_pages_temp_interval.go
deleted file mode 100644
index aeaf0ec6f..000000000
--- a/weed/filesys/page_writer/dirty_pages_temp_interval.go
+++ /dev/null
@@ -1,302 +0,0 @@
-package page_writer
-
-import (
- "io"
- "log"
- "os"
-)
-
-type WrittenIntervalNode struct {
- DataOffset int64
- TempOffset int64
- Size int64
- Next *WrittenIntervalNode
-}
-
-type WrittenIntervalLinkedList struct {
- tempFile *os.File
- Head *WrittenIntervalNode
- Tail *WrittenIntervalNode
-}
-
-type WrittenContinuousIntervals struct {
- TempFile *os.File
- LastOffset int64
- Lists []*WrittenIntervalLinkedList
-}
-
-func (list *WrittenIntervalLinkedList) Offset() int64 {
- return list.Head.DataOffset
-}
-func (list *WrittenIntervalLinkedList) Size() int64 {
- return list.Tail.DataOffset + list.Tail.Size - list.Head.DataOffset
-}
-func (list *WrittenIntervalLinkedList) addNodeToTail(node *WrittenIntervalNode) {
- // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
- if list.Tail.TempOffset+list.Tail.Size == node.TempOffset {
- // already connected
- list.Tail.Size += node.Size
- } else {
- list.Tail.Next = node
- list.Tail = node
- }
-}
-func (list *WrittenIntervalLinkedList) addNodeToHead(node *WrittenIntervalNode) {
- // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
- node.Next = list.Head
- list.Head = node
-}
-
-func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
- t := list.Head
- for {
-
- nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
- if nodeStart < nodeStop {
- // glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop)
- list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset)
- }
-
- if t.Next == nil {
- break
- }
- t = t.Next
- }
-}
-
-func (c *WrittenContinuousIntervals) TotalSize() (total int64) {
- for _, list := range c.Lists {
- total += list.Size()
- }
- return
-}
-
-func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenIntervalLinkedList {
- var nodes []*WrittenIntervalNode
- for t := list.Head; t != nil; t = t.Next {
- nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
- if nodeStart >= nodeStop {
- // skip non overlapping WrittenIntervalNode
- continue
- }
- nodes = append(nodes, &WrittenIntervalNode{
- TempOffset: t.TempOffset + nodeStart - t.DataOffset,
- DataOffset: nodeStart,
- Size: nodeStop - nodeStart,
- Next: nil,
- })
- }
- for i := 1; i < len(nodes); i++ {
- nodes[i-1].Next = nodes[i]
- }
- return &WrittenIntervalLinkedList{
- tempFile: list.tempFile,
- Head: nodes[0],
- Tail: nodes[len(nodes)-1],
- }
-}
-
-func (c *WrittenContinuousIntervals) debug() {
- log.Printf("++")
- for _, l := range c.Lists {
- log.Printf("++++")
- for t := l.Head; ; t = t.Next {
- log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
- if t.Next == nil {
- break
- }
- }
- log.Printf("----")
- }
- log.Printf("--")
-}
-
-func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
-
- interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
-
- // append to the tail and return
- if len(c.Lists) == 1 {
- lastSpan := c.Lists[0]
- if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset {
- lastSpan.addNodeToTail(interval)
- return
- }
- }
-
- var newLists []*WrittenIntervalLinkedList
- for _, list := range c.Lists {
- // if list is to the left of new interval, add to the new list
- if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset {
- newLists = append(newLists, list)
- }
- // if list is to the right of new interval, add to the new list
- if interval.DataOffset+interval.Size <= list.Head.DataOffset {
- newLists = append(newLists, list)
- }
- // if new interval overwrite the right part of the list
- if list.Head.DataOffset < interval.DataOffset && interval.DataOffset < list.Tail.DataOffset+list.Tail.Size {
- // create a new list of the left part of existing list
- newLists = append(newLists, list.subList(list.Offset(), interval.DataOffset))
- }
- // if new interval overwrite the left part of the list
- if list.Head.DataOffset < interval.DataOffset+interval.Size && interval.DataOffset+interval.Size < list.Tail.DataOffset+list.Tail.Size {
- // create a new list of the right part of existing list
- newLists = append(newLists, list.subList(interval.DataOffset+interval.Size, list.Tail.DataOffset+list.Tail.Size))
- }
- // skip anything that is fully overwritten by the new interval
- }
-
- c.Lists = newLists
- // add the new interval to the lists, connecting neighbor lists
- var prevList, nextList *WrittenIntervalLinkedList
-
- for _, list := range c.Lists {
- if list.Head.DataOffset == interval.DataOffset+interval.Size {
- nextList = list
- break
- }
- }
-
- for _, list := range c.Lists {
- if list.Head.DataOffset+list.Size() == dataOffset {
- list.addNodeToTail(interval)
- prevList = list
- break
- }
- }
-
- if prevList != nil && nextList != nil {
- // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
- prevList.Tail.Next = nextList.Head
- prevList.Tail = nextList.Tail
- c.removeList(nextList)
- } else if nextList != nil {
- // add to head was not done when checking
- nextList.addNodeToHead(interval)
- }
- if prevList == nil && nextList == nil {
- c.Lists = append(c.Lists, &WrittenIntervalLinkedList{
- tempFile: c.TempFile,
- Head: interval,
- Tail: interval,
- })
- }
-
- return
-}
-
-func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList {
- var maxSize int64
- maxIndex := -1
- for k, list := range c.Lists {
- if maxSize <= list.Size() {
- maxSize = list.Size()
- maxIndex = k
- }
- }
- if maxSize <= 0 {
- return nil
- }
-
- t := c.Lists[maxIndex]
- t.tempFile = c.TempFile
- c.Lists = append(c.Lists[0:maxIndex], c.Lists[maxIndex+1:]...)
- return t
-
-}
-
-func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) {
- index := -1
- for k, list := range c.Lists {
- if list.Offset() == target.Offset() {
- index = k
- }
- }
- if index < 0 {
- return
- }
-
- c.Lists = append(c.Lists[0:index], c.Lists[index+1:]...)
-
-}
-
-func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
- for _, list := range c.Lists {
- start := max(startOffset, list.Offset())
- stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
- if start < stop {
- list.ReadData(data[start-startOffset:], start, stop)
- maxStop = max(maxStop, stop)
- }
- }
- return
-}
-
-func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader {
- // TODO: optimize this to avoid another loop
- var readers []io.Reader
- for t := l.Head; ; t = t.Next {
- startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
- if startOffset < stopOffset {
- // glog.V(4).Infof("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
- readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
- }
- if t.Next == nil {
- break
- }
- }
- if len(readers) == 1 {
- return readers[0]
- }
- return io.MultiReader(readers...)
-}
-
-type FileSectionReader struct {
- file *os.File
- tempStartOffset int64
- Offset int64
- dataStart int64
- dataStop int64
-}
-
-var _ = io.Reader(&FileSectionReader{})
-
-func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
- return &FileSectionReader{
- file: tempfile,
- tempStartOffset: offset,
- Offset: offset,
- dataStart: dataOffset,
- dataStop: dataOffset + size,
- }
-}
-
-func (f *FileSectionReader) Read(p []byte) (n int, err error) {
- remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset)
- if remaining <= 0 {
- return 0, io.EOF
- }
- dataLen := min(remaining, int64(len(p)))
- // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart)
- n, err = f.file.ReadAt(p[:dataLen], f.Offset)
- if n > 0 {
- f.Offset += int64(n)
- } else {
- err = io.EOF
- }
- return
-}
-
-func max(x, y int64) int64 {
- if x > y {
- return x
- }
- return y
-}
-func min(x, y int64) int64 {
- if x < y {
- return x
- }
- return y
-}
diff --git a/weed/filesys/page_writer/page_chunk_interval_list.go b/weed/filesys/page_writer/page_chunk_interval_list.go
index e626b2a7f..09fc45bfb 100644
--- a/weed/filesys/page_writer/page_chunk_interval_list.go
+++ b/weed/filesys/page_writer/page_chunk_interval_list.go
@@ -10,6 +10,10 @@ type PageChunkWrittenInterval struct {
next *PageChunkWrittenInterval
}
+func (interval *PageChunkWrittenInterval) Size() int64 {
+ return interval.stopOffset - interval.startOffset
+}
+
// PageChunkWrittenIntervalList mark written intervals within one page chunk
type PageChunkWrittenIntervalList struct {
head *PageChunkWrittenInterval
diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go
index 42ca3d969..44b69cda7 100644
--- a/weed/filesys/page_writer_pattern.go
+++ b/weed/filesys/page_writer_pattern.go
@@ -14,18 +14,21 @@ type WriterPattern struct {
func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern {
return &WriterPattern{
isStreaming: true,
- lastWriteOffset: 0,
+ lastWriteOffset: -1,
chunkSize: chunkSize,
fileName: fileName,
}
}
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
- if rp.lastWriteOffset == 0 {
- }
if rp.lastWriteOffset > offset {
rp.isStreaming = false
}
+ if rp.lastWriteOffset == -1 {
+ if offset != 0 {
+ rp.isStreaming = false
+ }
+ }
rp.lastWriteOffset = offset
}