aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/dirty_pages_chunked.go4
-rw-r--r--weed/mount/page_writer.go24
-rw-r--r--weed/mount/page_writer/dirty_pages.go2
-rw-r--r--weed/mount/page_writer/page_chunk_swapfile.go10
-rw-r--r--weed/mount/page_writer/upload_pipeline.go17
-rw-r--r--weed/mount/page_writer/upload_pipeline_test.go2
-rw-r--r--weed/mount/page_writer_pattern.go44
-rw-r--r--weed/mount/weedfs_file_write.go4
8 files changed, 82 insertions, 25 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index a76c6dd61..e0d764070 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -38,11 +38,11 @@ func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
return dirtyPages
}
-func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte) {
+func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool) {
pages.hasWrites = true
glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
- pages.uploadPipeline.SaveDataAt(data, offset)
+ pages.uploadPipeline.SaveDataAt(data, offset, isSequential)
return
}
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
index ffcaa6398..016c4841a 100644
--- a/weed/mount/page_writer.go
+++ b/weed/mount/page_writer.go
@@ -6,10 +6,11 @@ import (
)
type PageWriter struct {
- fh *FileHandle
- collection string
- replication string
- chunkSize int64
+ fh *FileHandle
+ collection string
+ replication string
+ chunkSize int64
+ writerPattern *WriterPattern
randomWriter page_writer.DirtyPages
}
@@ -20,28 +21,29 @@ var (
func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
pw := &PageWriter{
- fh: fh,
- chunkSize: chunkSize,
- randomWriter: newMemoryChunkPages(fh, chunkSize),
+ fh: fh,
+ chunkSize: chunkSize,
+ writerPattern: NewWriterPattern(chunkSize),
+ randomWriter: newMemoryChunkPages(fh, chunkSize),
}
return pw
}
-func (pw *PageWriter) AddPage(offset int64, data []byte) {
+func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- pw.addToOneChunk(i, offset, data[:writeSize])
+ pw.addToOneChunk(i, offset, data[:writeSize], isSequentail)
offset += writeSize
data = data[writeSize:]
}
}
-func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
- pw.randomWriter.AddPage(offset, data)
+func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte, isSequential bool) {
+ pw.randomWriter.AddPage(offset, data, isSequential)
}
func (pw *PageWriter) FlushData() error {
diff --git a/weed/mount/page_writer/dirty_pages.go b/weed/mount/page_writer/dirty_pages.go
index 25b747fad..c16cee47a 100644
--- a/weed/mount/page_writer/dirty_pages.go
+++ b/weed/mount/page_writer/dirty_pages.go
@@ -1,7 +1,7 @@
package page_writer
type DirtyPages interface {
- AddPage(offset int64, data []byte)
+ AddPage(offset int64, data []byte, isSequential bool)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
diff --git a/weed/mount/page_writer/page_chunk_swapfile.go b/weed/mount/page_writer/page_chunk_swapfile.go
index 486557629..c70f8c5a1 100644
--- a/weed/mount/page_writer/page_chunk_swapfile.go
+++ b/weed/mount/page_writer/page_chunk_swapfile.go
@@ -18,6 +18,7 @@ type SwapFile struct {
file *os.File
logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
chunkSize int64
+ freeActualChunkList []ActualChunkIndex
}
type SwapFileChunk struct {
@@ -53,7 +54,12 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
}
actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
if !found {
- actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
+ if len(sf.freeActualChunkList) > 0 {
+ actualChunkIndex = sf.freeActualChunkList[0]
+ sf.freeActualChunkList = sf.freeActualChunkList[1:]
+ } else {
+ actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
+ }
sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
}
@@ -66,6 +72,8 @@ func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapF
}
func (sc *SwapFileChunk) FreeResource() {
+ sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
+ delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
}
func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
diff --git a/weed/mount/page_writer/upload_pipeline.go b/weed/mount/page_writer/upload_pipeline.go
index 190076a2b..2286cdf00 100644
--- a/weed/mount/page_writer/upload_pipeline.go
+++ b/weed/mount/page_writer/upload_pipeline.go
@@ -55,7 +55,7 @@ func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64,
}
}
-func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
+func (up *UploadPipeline) SaveDataAt(p []byte, off int64, isSequential bool) (n int) {
up.writableChunksLock.Lock()
defer up.writableChunksLock.Unlock()
@@ -63,13 +63,8 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
pageChunk, found := up.writableChunks[logicChunkIndex]
if !found {
- if atomic.LoadInt64(&memChunkCounter) > 4*int64(up.bufferChunkLimit) {
- // if total number of chunks is over 4 times of per file buffer count limit
- pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
- } else if len(up.writableChunks) < up.bufferChunkLimit {
- // if current file chunks is still under the per file buffer count limit
- pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
- } else {
+ if len(up.writableChunks) > up.bufferChunkLimit {
+ // if current file chunks is over the per file buffer count limit
fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
for lci, mc := range up.writableChunks {
chunkFullness := mc.WrittenSize()
@@ -81,7 +76,13 @@ func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
delete(up.writableChunks, fullestChunkIndex)
// fmt.Printf("flush chunk %d with %d bytes written\n", logicChunkIndex, fullness)
+ }
+ if isSequential &&
+ len(up.writableChunks) < up.bufferChunkLimit &&
+ atomic.LoadInt64(&memChunkCounter) < 4*int64(up.bufferChunkLimit) {
pageChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
+ } else {
+ pageChunk = up.swapFile.NewTempFileChunk(logicChunkIndex)
}
up.writableChunks[logicChunkIndex] = pageChunk
}
diff --git a/weed/mount/page_writer/upload_pipeline_test.go b/weed/mount/page_writer/upload_pipeline_test.go
index 63b60faaf..f130c97c1 100644
--- a/weed/mount/page_writer/upload_pipeline_test.go
+++ b/weed/mount/page_writer/upload_pipeline_test.go
@@ -31,7 +31,7 @@ func writeRange(uploadPipeline *UploadPipeline, startOff, stopOff int64) {
p := make([]byte, 4)
for i := startOff / 4; i < stopOff/4; i += 4 {
util.Uint32toBytes(p, uint32(i))
- uploadPipeline.SaveDataAt(p, i)
+ uploadPipeline.SaveDataAt(p, i, false)
}
}
diff --git a/weed/mount/page_writer_pattern.go b/weed/mount/page_writer_pattern.go
new file mode 100644
index 000000000..665056b36
--- /dev/null
+++ b/weed/mount/page_writer_pattern.go
@@ -0,0 +1,44 @@
+package mount
+
+type WriterPattern struct {
+ isStreaming bool
+ lastWriteOffset int64
+ chunkSize int64
+}
+
+// For streaming write: only cache the first chunk
+// For random write: fall back to temp file approach
+// writes can only change from streaming mode to non-streaming mode
+
+func NewWriterPattern(chunkSize int64) *WriterPattern {
+ return &WriterPattern{
+ isStreaming: true,
+ lastWriteOffset: -1,
+ chunkSize: chunkSize,
+ }
+}
+
+func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
+ if rp.lastWriteOffset > offset {
+ rp.isStreaming = false
+ }
+ if rp.lastWriteOffset == -1 {
+ if offset != 0 {
+ rp.isStreaming = false
+ }
+ }
+ rp.lastWriteOffset = offset
+}
+
+func (rp *WriterPattern) IsStreamingMode() bool {
+ return rp.isStreaming
+}
+
+func (rp *WriterPattern) IsRandomMode() bool {
+ return !rp.isStreaming
+}
+
+func (rp *WriterPattern) Reset() {
+ rp.isStreaming = true
+ rp.lastWriteOffset = -1
+}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index f71e27335..d14680752 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -43,6 +43,8 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
return 0, fuse.ENOENT
}
+ fh.dirtyPages.writerPattern.MonitorWriteAt(int64(in.Offset), int(in.Size))
+
fh.Lock()
defer fh.Unlock()
@@ -56,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- fh.dirtyPages.AddPage(offset, data)
+ fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode())
written = uint32(len(data))