diff options
| -rw-r--r-- | weed/filesys/dirty_pages_continuous.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_temp_file.go | 96 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 1 | ||||
| -rw-r--r-- | weed/filesys/page_writer.go | 28 | ||||
| -rw-r--r-- | weed/filesys/page_writer/chunked_file_writer.go | 152 | ||||
| -rw-r--r-- | weed/filesys/page_writer/chunked_file_writer_test.go | 60 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_pages.go | 20 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_pages_temp_interval.go | 302 | ||||
| -rw-r--r-- | weed/filesys/page_writer/page_chunk_interval_list.go | 4 | ||||
| -rw-r--r-- | weed/filesys/page_writer_pattern.go | 9 |
10 files changed, 294 insertions, 380 deletions
diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go index 88b50ce41..19401b94e 100644 --- a/weed/filesys/dirty_pages_continuous.go +++ b/weed/filesys/dirty_pages_continuous.go @@ -147,3 +147,5 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6 func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { return pages.collection, pages.replication } +func (pages ContinuousDirtyPages) Destroy() { +} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index 6a22889dc..cc6f4dc29 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -12,22 +12,21 @@ import ( ) type TempFileDirtyPages struct { - f *File - tf *os.File - writtenIntervals *page_writer.WrittenContinuousIntervals - writeWaitGroup sync.WaitGroup - pageAddLock sync.Mutex - chunkAddLock sync.Mutex - lastErr error - collection string - replication string + f *File + writeWaitGroup sync.WaitGroup + pageAddLock sync.Mutex + chunkAddLock sync.Mutex + lastErr error + collection string + replication string + chunkedFile *page_writer.ChunkedFileWriter } -func newTempFileDirtyPages(file *File) *TempFileDirtyPages { +func newTempFileDirtyPages(file *File, chunkSize int64) *TempFileDirtyPages { tempFile := &TempFileDirtyPages{ - f: file, - writtenIntervals: &page_writer.WrittenContinuousIntervals{}, + f: file, + chunkedFile: page_writer.NewChunkedFileWriter(file.wfs.option.getTempFilePageDir(), chunkSize), } return tempFile @@ -38,84 +37,46 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { pages.pageAddLock.Lock() defer pages.pageAddLock.Unlock() - if pages.tf == nil { - tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "") - if err != nil { - glog.Errorf("create temp file: %v", err) - pages.lastErr = err - return - } - pages.tf = tf - pages.writtenIntervals.TempFile = tf - pages.writtenIntervals.LastOffset = 0 - } - - writtenOffset := pages.writtenIntervals.LastOffset - dataSize := int64(len(data)) - - // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) - - if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { + if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil { pages.lastErr = err - } else { - pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) - pages.writtenIntervals.LastOffset += dataSize } - // pages.writtenIntervals.debug() - return } func (pages *TempFileDirtyPages) FlushData() error { - - pages.saveExistingPagesToStorage() + pages.saveChunkedFileToStorage() pages.writeWaitGroup.Wait() if pages.lastErr != nil { return fmt.Errorf("flush data: %v", pages.lastErr) } - pages.pageAddLock.Lock() - defer pages.pageAddLock.Unlock() - if pages.tf != nil { - - pages.writtenIntervals.TempFile = nil - pages.writtenIntervals.Lists = nil - - pages.tf.Close() - os.Remove(pages.tf.Name()) - pages.tf = nil - } return nil } -func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { +func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.chunkedFile.ReadDataAt(data, startOffset) +} - pageSize := pages.f.wfs.option.ChunkSizeLimit +func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { + return pages.collection, pages.replication +} - // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) +func (pages *TempFileDirtyPages) saveChunkedFileToStorage() { - for _, list := range pages.writtenIntervals.Lists { - listStopOffset := list.Offset() + list.Size() - for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { - start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) - if start >= stop { - continue - } - // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists)) - pages.saveToStorage(list.ToReader(start, stop), start, stop-start) - } - } + pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex int, interval *page_writer.PageChunkWrittenInterval) { + reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval) + pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize, interval.Size()) + }) } -func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { +func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64) { mtime := time.Now().UnixNano() pages.writeWaitGroup.Add(1) writer := func() { defer pages.writeWaitGroup.Done() - reader = io.LimitReader(reader, size) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) @@ -135,12 +96,9 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s } else { go writer() } -} -func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - return pages.writtenIntervals.ReadDataAt(data, startOffset) } -func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { - return pages.collection, pages.replication +func (pages TempFileDirtyPages) Destroy() { + pages.chunkedFile.Destroy() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index d92b17b5b..a551e6e10 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -222,6 +222,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err fh.reader = nil fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + fh.dirtyPages.Destroy() } if fh.f.isOpen < 0 { diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 9c9e38968..8e52f9f67 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -24,7 +24,7 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter { pw := &PageWriter{ f: file, chunkSize: chunkSize, - randomWriter: newTempFileDirtyPages(file), + randomWriter: newTempFileDirtyPages(file, chunkSize), streamWriter: newContinuousDirtyPages(file), writerPattern: NewWriterPattern(file.Name, chunkSize), } @@ -63,11 +63,23 @@ func (pw *PageWriter) FlushData() error { return pw.randomWriter.FlushData() } -func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data))) - m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset) - m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset) - return max(m1, m2) +func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data))) + + chunkIndex := offset / pw.chunkSize + for i := chunkIndex; len(data) > 0; i++ { + readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) + + m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) + m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) + + maxStop = max(maxStop, max(m1, m2)) + + offset += readSize + data = data[readSize:] + } + + return } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { @@ -76,3 +88,7 @@ func (pw *PageWriter) GetStorageOptions() (collection, replication string) { } return pw.randomWriter.GetStorageOptions() } + +func (pw *PageWriter) Destroy() { + pw.randomWriter.Destroy() +} diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go new file mode 100644 index 000000000..180a2039d --- /dev/null +++ b/weed/filesys/page_writer/chunked_file_writer.go @@ -0,0 +1,152 @@ +package page_writer + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "io" + "os" + "sync" +) + +// ChunkedFileWriter assumes the write requests will come in within chunks +type ChunkedFileWriter struct { + dir string + file *os.File + logicToActualChunkIndex map[int]int + chunkUsages []*PageChunkWrittenIntervalList + ChunkSize int64 + sync.Mutex +} + +var _ = io.WriterAt(&ChunkedFileWriter{}) + +func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter { + return &ChunkedFileWriter{ + dir: dir, + file: nil, + logicToActualChunkIndex: make(map[int]int), + ChunkSize: chunkSize, + } +} + +func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) { + cw.Lock() + defer cw.Unlock() + + if cw.file == nil { + cw.file, err = os.CreateTemp(cw.dir, "") + if err != nil { + glog.Errorf("create temp file: %v", err) + return + } + } + + actualOffset, chunkUsage := cw.toActualWriteOffset(off) + n, err = cw.file.WriteAt(p, actualOffset) + if err == nil { + startOffset := off % cw.ChunkSize + chunkUsage.MarkWritten(startOffset, startOffset+int64(n)) + } + return +} + +func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { + cw.Lock() + defer cw.Unlock() + + if cw.file == nil { + return + } + + logicChunkIndex := off / cw.ChunkSize + actualChunkIndex, chunkUsage := cw.toActualReadOffset(off) + if chunkUsage != nil { + for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { + logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset) + logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset) + if logicStart < logicStop { + actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize + _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart) + if err != nil { + glog.Errorf("reading temp file: %v", err) + break + } + maxStop = max(maxStop, logicStop) + } + } + } + return +} + +func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *PageChunkWrittenIntervalList) { + logicChunkIndex := int(logicOffset / cw.ChunkSize) + offsetRemainder := logicOffset % cw.ChunkSize + existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if found { + return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex] + } + cw.logicToActualChunkIndex[logicChunkIndex] = len(cw.chunkUsages) + chunkUsage = newPageChunkWrittenIntervalList() + cw.chunkUsages = append(cw.chunkUsages, chunkUsage) + return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage +} + +func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex int, chunkUsage *PageChunkWrittenIntervalList) { + logicChunkIndex := int(logicOffset / cw.ChunkSize) + existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if found { + return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex] + } + return 0, nil +} + +func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex int, interval *PageChunkWrittenInterval)) { + for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex { + chunkUsage := cw.chunkUsages[actualChunkIndex] + for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next { + process(cw.file, logicChunkIndex, t) + } + } +} +func (cw *ChunkedFileWriter) Destroy() { + if cw.file != nil { + cw.file.Close() + os.Remove(cw.file.Name()) + } +} + +type FileIntervalReader struct { + f *os.File + startOffset int64 + stopOffset int64 + position int64 +} + +var _ = io.Reader(&FileIntervalReader{}) + +func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex int, interval *PageChunkWrittenInterval) *FileIntervalReader { + actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex] + if !found { + // this should never happen + return nil + } + return &FileIntervalReader{ + f: cw.file, + startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset, + stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset, + position: 0, + } +} + +func (fr *FileIntervalReader) Read(p []byte) (n int, err error) { + readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position)) + n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position) + if err == nil || err == io.EOF { + fr.position += int64(n) + if fr.stopOffset-fr.startOffset-fr.position == 0 { + // return a tiny bit faster + err = io.EOF + return + } + } + return +} diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go new file mode 100644 index 000000000..1c72c77d4 --- /dev/null +++ b/weed/filesys/page_writer/chunked_file_writer_test.go @@ -0,0 +1,60 @@ +package page_writer + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestChunkedFileWriter_toActualOffset(t *testing.T) { + cw := NewChunkedFileWriter("", 16) + + writeToFile(cw, 50, 60) + writeToFile(cw, 60, 64) + + writeToFile(cw, 32, 40) + writeToFile(cw, 42, 48) + + writeToFile(cw, 48, 50) + + assert.Equal(t, 1, cw.chunkUsages[0].size(), "fully covered") + assert.Equal(t, 2, cw.chunkUsages[1].size(), "2 intervals") + +} + +func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) { + + _, chunkUsage := cw.toActualWriteOffset(startOffset) + + // skip doing actual writing + + innerOffset := startOffset % cw.ChunkSize + chunkUsage.MarkWritten(innerOffset, innerOffset+stopOffset-startOffset) + +} + +func TestWriteChunkedFile(t *testing.T) { + x := NewChunkedFileWriter(os.TempDir(), 20) + defer x.Destroy() + y := NewChunkedFileWriter(os.TempDir(), 12) + defer y.Destroy() + + batchSize := 4 + buf := make([]byte, batchSize) + for i := 0; i < 256; i++ { + for x := 0; x < batchSize; x++ { + buf[x] = byte(i) + } + x.WriteAt(buf, int64(i*batchSize)) + y.WriteAt(buf, int64((255-i)*batchSize)) + } + + a := make([]byte, 1) + b := make([]byte, 1) + for i := 0; i < 256*batchSize; i++ { + x.ReadDataAt(a, int64(i)) + y.ReadDataAt(b, int64(256*batchSize-1-i)) + assert.Equal(t, a[0], b[0], "same read") + } + +} diff --git a/weed/filesys/page_writer/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index c18f847b7..955627d67 100644 --- a/weed/filesys/page_writer/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -5,4 +5,24 @@ type DirtyPages interface { FlushData() error ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) + Destroy() +} + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} +func minInt(x, y int) int { + if x < y { + return x + } + return y } diff --git a/weed/filesys/page_writer/dirty_pages_temp_interval.go b/weed/filesys/page_writer/dirty_pages_temp_interval.go deleted file mode 100644 index aeaf0ec6f..000000000 --- a/weed/filesys/page_writer/dirty_pages_temp_interval.go +++ /dev/null @@ -1,302 +0,0 @@ -package page_writer - -import ( - "io" - "log" - "os" -) - -type WrittenIntervalNode struct { - DataOffset int64 - TempOffset int64 - Size int64 - Next *WrittenIntervalNode -} - -type WrittenIntervalLinkedList struct { - tempFile *os.File - Head *WrittenIntervalNode - Tail *WrittenIntervalNode -} - -type WrittenContinuousIntervals struct { - TempFile *os.File - LastOffset int64 - Lists []*WrittenIntervalLinkedList -} - -func (list *WrittenIntervalLinkedList) Offset() int64 { - return list.Head.DataOffset -} -func (list *WrittenIntervalLinkedList) Size() int64 { - return list.Tail.DataOffset + list.Tail.Size - list.Head.DataOffset -} -func (list *WrittenIntervalLinkedList) addNodeToTail(node *WrittenIntervalNode) { - // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - if list.Tail.TempOffset+list.Tail.Size == node.TempOffset { - // already connected - list.Tail.Size += node.Size - } else { - list.Tail.Next = node - list.Tail = node - } -} -func (list *WrittenIntervalLinkedList) addNodeToHead(node *WrittenIntervalNode) { - // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) - node.Next = list.Head - list.Head = node -} - -func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) { - t := list.Head - for { - - nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size) - if nodeStart < nodeStop { - // glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop) - list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset) - } - - if t.Next == nil { - break - } - t = t.Next - } -} - -func (c *WrittenContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.Lists { - total += list.Size() - } - return -} - -func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenIntervalLinkedList { - var nodes []*WrittenIntervalNode - for t := list.Head; t != nil; t = t.Next { - nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size) - if nodeStart >= nodeStop { - // skip non overlapping WrittenIntervalNode - continue - } - nodes = append(nodes, &WrittenIntervalNode{ - TempOffset: t.TempOffset + nodeStart - t.DataOffset, - DataOffset: nodeStart, - Size: nodeStop - nodeStart, - Next: nil, - }) - } - for i := 1; i < len(nodes); i++ { - nodes[i-1].Next = nodes[i] - } - return &WrittenIntervalLinkedList{ - tempFile: list.tempFile, - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } -} - -func (c *WrittenContinuousIntervals) debug() { - log.Printf("++") - for _, l := range c.Lists { - log.Printf("++++") - for t := l.Head; ; t = t.Next { - log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) - if t.Next == nil { - break - } - } - log.Printf("----") - } - log.Printf("--") -} - -func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) { - - interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} - - // append to the tail and return - if len(c.Lists) == 1 { - lastSpan := c.Lists[0] - if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset { - lastSpan.addNodeToTail(interval) - return - } - } - - var newLists []*WrittenIntervalLinkedList - for _, list := range c.Lists { - // if list is to the left of new interval, add to the new list - if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset { - newLists = append(newLists, list) - } - // if list is to the right of new interval, add to the new list - if interval.DataOffset+interval.Size <= list.Head.DataOffset { - newLists = append(newLists, list) - } - // if new interval overwrite the right part of the list - if list.Head.DataOffset < interval.DataOffset && interval.DataOffset < list.Tail.DataOffset+list.Tail.Size { - // create a new list of the left part of existing list - newLists = append(newLists, list.subList(list.Offset(), interval.DataOffset)) - } - // if new interval overwrite the left part of the list - if list.Head.DataOffset < interval.DataOffset+interval.Size && interval.DataOffset+interval.Size < list.Tail.DataOffset+list.Tail.Size { - // create a new list of the right part of existing list - newLists = append(newLists, list.subList(interval.DataOffset+interval.Size, list.Tail.DataOffset+list.Tail.Size)) - } - // skip anything that is fully overwritten by the new interval - } - - c.Lists = newLists - // add the new interval to the lists, connecting neighbor lists - var prevList, nextList *WrittenIntervalLinkedList - - for _, list := range c.Lists { - if list.Head.DataOffset == interval.DataOffset+interval.Size { - nextList = list - break - } - } - - for _, list := range c.Lists { - if list.Head.DataOffset+list.Size() == dataOffset { - list.addNodeToTail(interval) - prevList = list - break - } - } - - if prevList != nil && nextList != nil { - // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) - prevList.Tail.Next = nextList.Head - prevList.Tail = nextList.Tail - c.removeList(nextList) - } else if nextList != nil { - // add to head was not done when checking - nextList.addNodeToHead(interval) - } - if prevList == nil && nextList == nil { - c.Lists = append(c.Lists, &WrittenIntervalLinkedList{ - tempFile: c.TempFile, - Head: interval, - Tail: interval, - }) - } - - return -} - -func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList { - var maxSize int64 - maxIndex := -1 - for k, list := range c.Lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k - } - } - if maxSize <= 0 { - return nil - } - - t := c.Lists[maxIndex] - t.tempFile = c.TempFile - c.Lists = append(c.Lists[0:maxIndex], c.Lists[maxIndex+1:]...) - return t - -} - -func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) { - index := -1 - for k, list := range c.Lists { - if list.Offset() == target.Offset() { - index = k - } - } - if index < 0 { - return - } - - c.Lists = append(c.Lists[0:index], c.Lists[index+1:]...) - -} - -func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.Lists { - start := max(startOffset, list.Offset()) - stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start < stop { - list.ReadData(data[start-startOffset:], start, stop) - maxStop = max(maxStop, stop) - } - } - return -} - -func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader { - // TODO: optimize this to avoid another loop - var readers []io.Reader - for t := l.Head; ; t = t.Next { - startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop) - if startOffset < stopOffset { - // glog.V(4).Infof("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) - readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset)) - } - if t.Next == nil { - break - } - } - if len(readers) == 1 { - return readers[0] - } - return io.MultiReader(readers...) -} - -type FileSectionReader struct { - file *os.File - tempStartOffset int64 - Offset int64 - dataStart int64 - dataStop int64 -} - -var _ = io.Reader(&FileSectionReader{}) - -func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader { - return &FileSectionReader{ - file: tempfile, - tempStartOffset: offset, - Offset: offset, - dataStart: dataOffset, - dataStop: dataOffset + size, - } -} - -func (f *FileSectionReader) Read(p []byte) (n int, err error) { - remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset) - if remaining <= 0 { - return 0, io.EOF - } - dataLen := min(remaining, int64(len(p))) - // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart) - n, err = f.file.ReadAt(p[:dataLen], f.Offset) - if n > 0 { - f.Offset += int64(n) - } else { - err = io.EOF - } - return -} - -func max(x, y int64) int64 { - if x > y { - return x - } - return y -} -func min(x, y int64) int64 { - if x < y { - return x - } - return y -} diff --git a/weed/filesys/page_writer/page_chunk_interval_list.go b/weed/filesys/page_writer/page_chunk_interval_list.go index e626b2a7f..09fc45bfb 100644 --- a/weed/filesys/page_writer/page_chunk_interval_list.go +++ b/weed/filesys/page_writer/page_chunk_interval_list.go @@ -10,6 +10,10 @@ type PageChunkWrittenInterval struct { next *PageChunkWrittenInterval } +func (interval *PageChunkWrittenInterval) Size() int64 { + return interval.stopOffset - interval.startOffset +} + // PageChunkWrittenIntervalList mark written intervals within one page chunk type PageChunkWrittenIntervalList struct { head *PageChunkWrittenInterval diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go index 42ca3d969..44b69cda7 100644 --- a/weed/filesys/page_writer_pattern.go +++ b/weed/filesys/page_writer_pattern.go @@ -14,18 +14,21 @@ type WriterPattern struct { func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern { return &WriterPattern{ isStreaming: true, - lastWriteOffset: 0, + lastWriteOffset: -1, chunkSize: chunkSize, fileName: fileName, } } func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { - if rp.lastWriteOffset == 0 { - } if rp.lastWriteOffset > offset { rp.isStreaming = false } + if rp.lastWriteOffset == -1 { + if offset != 0 { + rp.isStreaming = false + } + } rp.lastWriteOffset = offset } |
