aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2021-12-24 22:38:22 -0800
committerchrislu <chris.lu@gmail.com>2021-12-24 22:38:22 -0800
commit083d8e9ecede84ea359962136c28ebda0ba1323b (patch)
tree85899215a275d5edbf20c806bf6123f2c78b991b
parent255a1c7dcd009524c34cb8c3d6fce59c6d9a03cb (diff)
downloadseaweedfs-083d8e9ecede84ea359962136c28ebda0ba1323b.tar.xz
seaweedfs-083d8e9ecede84ea359962136c28ebda0ba1323b.zip
add stream writer
this should improve streaming write performance, which is common in many cases, e.g., copying large files. This is additional to improved random read write operations: https://github.com/chrislusf/seaweedfs/wiki/FUSE-Mount/_compare/3e69d193805c79802f4f8f6cc63269b7a9a911f3...19084d87918f297cac15e2471c19306176e0771f
-rw-r--r--weed/filesys/dirty_pages_stream.go107
-rw-r--r--weed/filesys/dirty_pages_temp_file.go6
-rw-r--r--weed/filesys/filehandle.go16
-rw-r--r--weed/filesys/page_writer.go32
-rw-r--r--weed/filesys/page_writer/chunk_interval_list.go29
-rw-r--r--weed/filesys/page_writer/chunked_file_writer.go11
-rw-r--r--weed/filesys/page_writer/chunked_file_writer_test.go4
-rw-r--r--weed/filesys/page_writer/chunked_stream_writer.go119
-rw-r--r--weed/filesys/page_writer/chunked_stream_writer_test.go33
-rw-r--r--weed/filesys/page_writer_pattern.go9
10 files changed, 331 insertions, 35 deletions
diff --git a/weed/filesys/dirty_pages_stream.go b/weed/filesys/dirty_pages_stream.go
new file mode 100644
index 000000000..2d57ee0bc
--- /dev/null
+++ b/weed/filesys/dirty_pages_stream.go
@@ -0,0 +1,107 @@
+package filesys
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
+ "sync"
+ "time"
+)
+
+type StreamDirtyPages struct {
+ f *File
+ writeWaitGroup sync.WaitGroup
+ pageAddLock sync.Mutex
+ chunkAddLock sync.Mutex
+ lastErr error
+ collection string
+ replication string
+ chunkedStream *page_writer.ChunkedStreamWriter
+}
+
+func newStreamDirtyPages(file *File, chunkSize int64) *StreamDirtyPages {
+
+ dirtyPages := &StreamDirtyPages{
+ f: file,
+ chunkedStream: page_writer.NewChunkedStreamWriter(chunkSize),
+ }
+
+ dirtyPages.chunkedStream.SetSaveToStorageFunction(dirtyPages.saveChunkedFileIntevalToStorage)
+
+ return dirtyPages
+}
+
+func (pages *StreamDirtyPages) AddPage(offset int64, data []byte) {
+
+ pages.pageAddLock.Lock()
+ defer pages.pageAddLock.Unlock()
+
+ glog.V(4).Infof("%v stream AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data)))
+ if _, err := pages.chunkedStream.WriteAt(data, offset); err != nil {
+ pages.lastErr = err
+ }
+
+ return
+}
+
+func (pages *StreamDirtyPages) FlushData() error {
+ pages.saveChunkedFileToStorage()
+ pages.writeWaitGroup.Wait()
+ if pages.lastErr != nil {
+ return fmt.Errorf("flush data: %v", pages.lastErr)
+ }
+ pages.chunkedStream.Reset()
+ return nil
+}
+
+func (pages *StreamDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ return pages.chunkedStream.ReadDataAt(data, startOffset)
+}
+
+func (pages *StreamDirtyPages) GetStorageOptions() (collection, replication string) {
+ return pages.collection, pages.replication
+}
+
+func (pages *StreamDirtyPages) saveChunkedFileToStorage() {
+
+ pages.chunkedStream.FlushAll()
+
+}
+
+func (pages *StreamDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader, offset int64, size int64, cleanupFn func()) {
+
+ mtime := time.Now().UnixNano()
+ pages.writeWaitGroup.Add(1)
+ writer := func() {
+ defer pages.writeWaitGroup.Done()
+ defer cleanupFn()
+
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
+ if err != nil {
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
+ pages.lastErr = err
+ return
+ }
+ chunk.Mtime = mtime
+ pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ pages.f.addChunks([]*filer_pb.FileChunk{chunk})
+ glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
+ pages.chunkAddLock.Unlock()
+
+ cleanupFn()
+ }
+
+ if pages.f.wfs.concurrentWriters != nil {
+ pages.f.wfs.concurrentWriters.Execute(writer)
+ } else {
+ go writer()
+ }
+
+}
+
+func (pages StreamDirtyPages) Destroy() {
+ pages.chunkedStream.Reset()
+}
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index a207eeb38..e0c3a91de 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -37,6 +37,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
+ glog.V(4).Infof("%v tempfile AddPage [%d, %d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if _, err := pages.chunkedFile.WriteAt(data, offset); err != nil {
pages.lastErr = err
}
@@ -50,6 +51,7 @@ func (pages *TempFileDirtyPages) FlushData() error {
if pages.lastErr != nil {
return fmt.Errorf("flush data: %v", pages.lastErr)
}
+ pages.chunkedFile.Reset()
return nil
}
@@ -65,7 +67,7 @@ func (pages *TempFileDirtyPages) saveChunkedFileToStorage() {
pages.chunkedFile.ProcessEachInterval(func(file *os.File, logicChunkIndex page_writer.LogicChunkIndex, interval *page_writer.ChunkWrittenInterval) {
reader := page_writer.NewFileIntervalReader(pages.chunkedFile, logicChunkIndex, interval)
- pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize, interval.Size())
+ pages.saveChunkedFileIntevalToStorage(reader, int64(logicChunkIndex)*pages.chunkedFile.ChunkSize+interval.StartOffset, interval.Size())
})
}
@@ -100,5 +102,5 @@ func (pages *TempFileDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reade
}
func (pages TempFileDirtyPages) Destroy() {
- pages.chunkedFile.Destroy()
+ pages.chunkedFile.Reset()
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index a551e6e10..e25437fd3 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -27,6 +27,7 @@ type FileHandle struct {
contentType string
handle uint64
sync.Mutex
+ sync.WaitGroup
f *File
RequestId fuse.RequestID // unique ID for request
@@ -41,7 +42,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
fh := &FileHandle{
f: file,
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
- dirtyPages: newPageWriter(file, 2*1024*1024),
+ dirtyPages: newPageWriter(file, file.wfs.option.CacheSizeMB*1024*1024),
Uid: uid,
Gid: gid,
}
@@ -63,6 +64,9 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
+ fh.Add(1)
+ defer fh.Done()
+
fh.Lock()
defer fh.Unlock()
@@ -170,6 +174,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// Write to the file handle
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+ fh.Add(1)
+ defer fh.Done()
+
fh.Lock()
defer fh.Unlock()
@@ -209,8 +216,7 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen)
- fh.Lock()
- defer fh.Unlock()
+ fh.Wait()
fh.f.wfs.handlesLock.Lock()
fh.f.isOpen--
@@ -243,6 +249,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return nil
}
+ fh.Add(1)
+ defer fh.Done()
+
fh.Lock()
defer fh.Unlock()
@@ -251,7 +260,6 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return err
}
- glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle)
return nil
}
diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go
index 560ae052d..bdcbc0fbc 100644
--- a/weed/filesys/page_writer.go
+++ b/weed/filesys/page_writer.go
@@ -24,19 +24,21 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter {
pw := &PageWriter{
f: file,
chunkSize: chunkSize,
+ writerPattern: NewWriterPattern(chunkSize),
randomWriter: newTempFileDirtyPages(file, chunkSize),
- streamWriter: newContinuousDirtyPages(file),
- writerPattern: NewWriterPattern(file.Name, chunkSize),
+ streamWriter: newStreamDirtyPages(file, chunkSize),
+ //streamWriter: newContinuousDirtyPages(file),
+ //streamWriter: nil,
}
return pw
}
func (pw *PageWriter) AddPage(offset int64, data []byte) {
- glog.V(4).Infof("AddPage %v [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode())
-
pw.writerPattern.MonitorWriteAt(offset, len(data))
+ glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode())
+
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
@@ -48,7 +50,7 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) {
func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
if chunkIndex > 0 {
- if pw.writerPattern.IsStreamingMode() {
+ if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil {
pw.streamWriter.AddPage(offset, data)
return
}
@@ -57,8 +59,11 @@ func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
}
func (pw *PageWriter) FlushData() error {
- if err := pw.streamWriter.FlushData(); err != nil {
- return err
+ pw.writerPattern.Reset()
+ if pw.streamWriter != nil {
+ if err := pw.streamWriter.FlushData(); err != nil {
+ return err
+ }
}
return pw.randomWriter.FlushData()
}
@@ -70,10 +75,12 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64)
for i := chunkIndex; len(data) > 0; i++ {
readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset)
+ if pw.streamWriter != nil {
+ m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset)
+ maxStop = max(maxStop, m1)
+ }
m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset)
-
- maxStop = max(maxStop, max(m1, m2))
+ maxStop = max(maxStop, m2)
offset += readSize
data = data[readSize:]
@@ -83,13 +90,16 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64)
}
func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
- if pw.writerPattern.IsStreamingMode() {
+ if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil {
return pw.streamWriter.GetStorageOptions()
}
return pw.randomWriter.GetStorageOptions()
}
func (pw *PageWriter) Destroy() {
+ if pw.streamWriter != nil {
+ pw.streamWriter.Destroy()
+ }
pw.randomWriter.Destroy()
}
diff --git a/weed/filesys/page_writer/chunk_interval_list.go b/weed/filesys/page_writer/chunk_interval_list.go
index 9c518192f..dca9a1740 100644
--- a/weed/filesys/page_writer/chunk_interval_list.go
+++ b/weed/filesys/page_writer/chunk_interval_list.go
@@ -4,14 +4,18 @@ import "math"
// ChunkWrittenInterval mark one written interval within one page chunk
type ChunkWrittenInterval struct {
- startOffset int64
+ StartOffset int64
stopOffset int64
prev *ChunkWrittenInterval
next *ChunkWrittenInterval
}
func (interval *ChunkWrittenInterval) Size() int64 {
- return interval.stopOffset - interval.startOffset
+ return interval.stopOffset - interval.StartOffset
+}
+
+func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
+ return interval.stopOffset-interval.StartOffset == chunkSize
}
// ChunkWrittenIntervalList mark written intervals within one page chunk
@@ -23,11 +27,11 @@ type ChunkWrittenIntervalList struct {
func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
list := &ChunkWrittenIntervalList{
head: &ChunkWrittenInterval{
- startOffset: -1,
+ StartOffset: -1,
stopOffset: -1,
},
tail: &ChunkWrittenInterval{
- startOffset: math.MaxInt64,
+ StartOffset: math.MaxInt64,
stopOffset: math.MaxInt64,
},
}
@@ -38,35 +42,40 @@ func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
interval := &ChunkWrittenInterval{
- startOffset: startOffset,
+ StartOffset: startOffset,
stopOffset: stopOffset,
}
list.addInterval(interval)
}
+
+func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
+ return list.size() == 1 && list.head.next.isComplete(chunkSize)
+}
+
func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
p := list.head
- for ; p.next != nil && p.next.startOffset <= interval.startOffset; p = p.next {
+ for ; p.next != nil && p.next.StartOffset <= interval.StartOffset; p = p.next {
}
q := list.tail
for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
}
- if interval.startOffset <= p.stopOffset && q.startOffset <= interval.stopOffset {
+ if interval.StartOffset <= p.stopOffset && q.StartOffset <= interval.stopOffset {
// merge p and q together
p.stopOffset = q.stopOffset
unlinkNodesBetween(p, q.next)
return
}
- if interval.startOffset <= p.stopOffset {
+ if interval.StartOffset <= p.stopOffset {
// merge new interval into p
p.stopOffset = interval.stopOffset
unlinkNodesBetween(p, q)
return
}
- if q.startOffset <= interval.stopOffset {
+ if q.StartOffset <= interval.stopOffset {
// merge new interval into q
- q.startOffset = interval.startOffset
+ q.StartOffset = interval.StartOffset
unlinkNodesBetween(p, q)
return
}
diff --git a/weed/filesys/page_writer/chunked_file_writer.go b/weed/filesys/page_writer/chunked_file_writer.go
index 14c034900..b0e1c2844 100644
--- a/weed/filesys/page_writer/chunked_file_writer.go
+++ b/weed/filesys/page_writer/chunked_file_writer.go
@@ -64,7 +64,7 @@ func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
if chunkUsage != nil {
for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
- logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.startOffset)
+ logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
if logicStart < logicStop {
actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
@@ -110,11 +110,16 @@ func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, log
}
}
}
-func (cw *ChunkedFileWriter) Destroy() {
+
+// Reset releases used resources
+func (cw *ChunkedFileWriter) Reset() {
if cw.file != nil {
cw.file.Close()
os.Remove(cw.file.Name())
+ cw.file = nil
}
+ cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
+ cw.chunkUsages = cw.chunkUsages[:0]
}
type FileIntervalReader struct {
@@ -134,7 +139,7 @@ func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkInde
}
return &FileIntervalReader{
f: cw.file,
- startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.startOffset,
+ startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
position: 0,
}
diff --git a/weed/filesys/page_writer/chunked_file_writer_test.go b/weed/filesys/page_writer/chunked_file_writer_test.go
index 1c72c77d4..244ed62c3 100644
--- a/weed/filesys/page_writer/chunked_file_writer_test.go
+++ b/weed/filesys/page_writer/chunked_file_writer_test.go
@@ -35,9 +35,9 @@ func writeToFile(cw *ChunkedFileWriter, startOffset int64, stopOffset int64) {
func TestWriteChunkedFile(t *testing.T) {
x := NewChunkedFileWriter(os.TempDir(), 20)
- defer x.Destroy()
+ defer x.Reset()
y := NewChunkedFileWriter(os.TempDir(), 12)
- defer y.Destroy()
+ defer y.Reset()
batchSize := 4
buf := make([]byte, batchSize)
diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go
new file mode 100644
index 000000000..b4314e78f
--- /dev/null
+++ b/weed/filesys/page_writer/chunked_stream_writer.go
@@ -0,0 +1,119 @@
+package page_writer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "io"
+ "sync"
+ "sync/atomic"
+)
+
+type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
+
+// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode
+type ChunkedStreamWriter struct {
+ activeChunks map[LogicChunkIndex]*MemChunk
+ activeChunksLock sync.Mutex
+ ChunkSize int64
+ saveToStorageFn SaveToStorageFunc
+ sync.Mutex
+}
+
+type MemChunk struct {
+ buf []byte
+ usage *ChunkWrittenIntervalList
+}
+
+var _ = io.WriterAt(&ChunkedStreamWriter{})
+
+func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter {
+ return &ChunkedStreamWriter{
+ ChunkSize: chunkSize,
+ activeChunks: make(map[LogicChunkIndex]*MemChunk),
+ }
+}
+
+func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) {
+ cw.saveToStorageFn = saveToStorageFn
+}
+
+func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
+ cw.Lock()
+ defer cw.Unlock()
+
+ logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
+ offsetRemainder := off % cw.ChunkSize
+
+ memChunk, found := cw.activeChunks[logicChunkIndex]
+ if !found {
+ memChunk = &MemChunk{
+ buf: mem.Allocate(int(cw.ChunkSize)),
+ usage: newChunkWrittenIntervalList(),
+ }
+ cw.activeChunks[logicChunkIndex] = memChunk
+ }
+ n = copy(memChunk.buf[offsetRemainder:], p)
+ memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n))
+ if memChunk.usage.IsComplete(cw.ChunkSize) {
+ if cw.saveToStorageFn != nil {
+ cw.saveOneChunk(memChunk, logicChunkIndex)
+ delete(cw.activeChunks, logicChunkIndex)
+ }
+ }
+
+ return
+}
+
+func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
+ cw.Lock()
+ defer cw.Unlock()
+
+ logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
+ memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize
+ memChunk, found := cw.activeChunks[logicChunkIndex]
+ if !found {
+ return
+ }
+
+ for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
+ logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset)
+ logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
+ if logicStart < logicStop {
+ copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
+ maxStop = max(maxStop, logicStop)
+ }
+ }
+ return
+}
+
+func (cw *ChunkedStreamWriter) FlushAll() {
+ cw.Lock()
+ defer cw.Unlock()
+ for logicChunkIndex, memChunk := range cw.activeChunks {
+ if cw.saveToStorageFn != nil {
+ cw.saveOneChunk(memChunk, logicChunkIndex)
+ delete(cw.activeChunks, logicChunkIndex)
+ }
+ }
+}
+
+func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
+ var referenceCounter = int32(memChunk.usage.size())
+ for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
+ reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
+ cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
+ atomic.AddInt32(&referenceCounter, -1)
+ if atomic.LoadInt32(&referenceCounter) == 0 {
+ mem.Free(memChunk.buf)
+ }
+ })
+ }
+}
+
+// Reset releases used resources
+func (cw *ChunkedStreamWriter) Reset() {
+ for t, memChunk := range cw.activeChunks {
+ mem.Free(memChunk.buf)
+ delete(cw.activeChunks, t)
+ }
+}
diff --git a/weed/filesys/page_writer/chunked_stream_writer_test.go b/weed/filesys/page_writer/chunked_stream_writer_test.go
new file mode 100644
index 000000000..3c55a91ad
--- /dev/null
+++ b/weed/filesys/page_writer/chunked_stream_writer_test.go
@@ -0,0 +1,33 @@
+package page_writer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestWriteChunkedStream(t *testing.T) {
+ x := NewChunkedStreamWriter(20)
+ defer x.Reset()
+ y := NewChunkedFileWriter(os.TempDir(), 12)
+ defer y.Reset()
+
+ batchSize := 4
+ buf := make([]byte, batchSize)
+ for i := 0; i < 256; i++ {
+ for x := 0; x < batchSize; x++ {
+ buf[x] = byte(i)
+ }
+ x.WriteAt(buf, int64(i*batchSize))
+ y.WriteAt(buf, int64((255-i)*batchSize))
+ }
+
+ a := make([]byte, 1)
+ b := make([]byte, 1)
+ for i := 0; i < 256*batchSize; i++ {
+ x.ReadDataAt(a, int64(i))
+ y.ReadDataAt(b, int64(256*batchSize-1-i))
+ assert.Equal(t, a[0], b[0], "same read")
+ }
+
+}
diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go
index 44b69cda7..51c63d472 100644
--- a/weed/filesys/page_writer_pattern.go
+++ b/weed/filesys/page_writer_pattern.go
@@ -4,19 +4,17 @@ type WriterPattern struct {
isStreaming bool
lastWriteOffset int64
chunkSize int64
- fileName string
}
// For streaming write: only cache the first chunk
// For random write: fall back to temp file approach
// writes can only change from streaming mode to non-streaming mode
-func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern {
+func NewWriterPattern(chunkSize int64) *WriterPattern {
return &WriterPattern{
isStreaming: true,
lastWriteOffset: -1,
chunkSize: chunkSize,
- fileName: fileName,
}
}
@@ -39,3 +37,8 @@ func (rp *WriterPattern) IsStreamingMode() bool {
func (rp *WriterPattern) IsRandomMode() bool {
return !rp.isStreaming
}
+
+func (rp *WriterPattern) Reset() {
+ rp.isStreaming = true
+ rp.lastWriteOffset = -1
+}