aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-19 19:06:25 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-19 19:06:25 -0800
commit1d5fc8df4309a3456224f5d6304f8d95e5e11c09 (patch)
treeeb85b60e3c1cd6effd0ae902db08da9ce1d0b9d0
parent565f7a6e724cf2dca401bd699a65f5814606ebfe (diff)
downloadseaweedfs-origin/volume_buffered_writes.tar.xz
seaweedfs-origin/volume_buffered_writes.zip
working delayed buffered writesorigin/volume_buffered_writes
but no obvious perf improvements
-rw-r--r--weed/storage/backend/disk_file.go40
-rw-r--r--weed/util/buffered_writer/max_latency_writer.go167
2 files changed, 205 insertions, 2 deletions
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 2b04c8df2..aee02e7be 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -1,6 +1,8 @@
package backend
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
"os"
"time"
)
@@ -12,32 +14,64 @@ var (
type DiskFile struct {
File *os.File
fullFilePath string
+ bufWriterAt *buffered_writer.TimedWriteBuffer
+ fileSize int64
+ modTime time.Time
}
func NewDiskFile(f *os.File) *DiskFile {
+ stat, e := f.Stat()
+ if e != nil {
+ glog.Fatalf("stat file %s: %v", f.Name(), e)
+ }
+
return &DiskFile{
fullFilePath: f.Name(),
File: f,
+ bufWriterAt: buffered_writer.NewTimedWriteBuffer(f, 1*1024*1024, 200*time.Millisecond, stat.Size()),
+ fileSize: stat.Size(),
+ modTime: stat.ModTime(),
}
}
func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
+ n, _ = df.bufWriterAt.ReadAt(p, off)
+ if len(p) == n {
+ return
+ }
return df.File.ReadAt(p, off)
}
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
- return df.File.WriteAt(p, off)
+ n, err = df.bufWriterAt.WriteAt(p, off)
+ if err == nil {
+ waterMark := off + int64(n)
+ if waterMark > df.fileSize {
+ df.fileSize = waterMark
+ df.modTime = time.Now()
+ }
+ }
+ return
}
func (df *DiskFile) Truncate(off int64) error {
- return df.File.Truncate(off)
+ err := df.File.Truncate(off)
+ if err == nil {
+ df.fileSize = off
+ df.modTime = time.Now()
+ }
+ return err
}
func (df *DiskFile) Close() error {
+ df.bufWriterAt.Close()
return df.File.Close()
}
func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
+ if df.fileSize != 0 {
+ return df.fileSize, df.modTime, nil
+ }
stat, e := df.File.Stat()
if e == nil {
return stat.Size(), stat.ModTime(), nil
@@ -50,5 +84,7 @@ func (df *DiskFile) Name() string {
}
func (df *DiskFile) Sync() error {
+ df.fileSize = 0
+ df.bufWriterAt.Flush()
return df.File.Sync()
}
diff --git a/weed/util/buffered_writer/max_latency_writer.go b/weed/util/buffered_writer/max_latency_writer.go
new file mode 100644
index 000000000..d95d26ffd
--- /dev/null
+++ b/weed/util/buffered_writer/max_latency_writer.go
@@ -0,0 +1,167 @@
+package buffered_writer
+
+import (
+ "io"
+ "sync"
+ "time"
+)
+
+type TimedWriteBuffer struct {
+ maxLatencyWriterAt *maxLatencyWriterAt
+ bufWriterAt *bufferedWriterAt
+}
+
+func (t *TimedWriteBuffer) ReadAt(p []byte, off int64) (n int, err error) {
+ bufStart := t.bufWriterAt.nextOffset - int64(t.bufWriterAt.dataSize)
+ start := max(bufStart, off)
+ stop := min(t.bufWriterAt.nextOffset, off+int64(len(p)))
+ if start <= stop {
+ n = copy(p, t.bufWriterAt.data[start-bufStart:stop-bufStart])
+ }
+ return
+}
+func (t *TimedWriteBuffer) WriteAt(p []byte, offset int64) (n int, err error) {
+ return t.maxLatencyWriterAt.WriteAt(p, offset)
+}
+func (t *TimedWriteBuffer) Flush() {
+ t.maxLatencyWriterAt.Flush()
+}
+func (t *TimedWriteBuffer) Close() {
+ t.maxLatencyWriterAt.Close()
+}
+
+func NewTimedWriteBuffer(writerAt io.WriterAt, size int, latency time.Duration, currentOffset int64) *TimedWriteBuffer {
+ bufWriterAt := newBufferedWriterAt(writerAt, size, currentOffset)
+ maxLatencyWriterAt := newMaxLatencyWriterAt(bufWriterAt, latency)
+ return &TimedWriteBuffer{
+ bufWriterAt: bufWriterAt,
+ maxLatencyWriterAt: maxLatencyWriterAt,
+ }
+}
+
+type bufferedWriterAt struct {
+ data []byte
+ dataSize int
+ nextOffset int64
+ writerAt io.WriterAt
+ counter int
+}
+
+func newBufferedWriterAt(writerAt io.WriterAt, bufferSize int, currentOffset int64) *bufferedWriterAt {
+ return &bufferedWriterAt{
+ data: make([]byte, bufferSize),
+ nextOffset: currentOffset,
+ dataSize: 0,
+ writerAt: writerAt,
+ }
+}
+
+func (b *bufferedWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
+ if b.nextOffset != offset {
+ println("nextOffset", b.nextOffset, "bufSize", b.dataSize, "offset", offset, "data", len(p))
+ }
+ if b.nextOffset != offset || len(p)+b.dataSize > len(b.data) {
+ if err = b.Flush(); err != nil {
+ return 0, err
+ }
+ }
+ if len(p)+b.dataSize > len(b.data) {
+ n, err = b.writerAt.WriteAt(p, offset)
+ if err == nil {
+ b.nextOffset = offset + int64(n)
+ }
+ } else {
+ n = copy(b.data[b.dataSize:len(p)+b.dataSize], p)
+ b.dataSize += n
+ b.nextOffset += int64(n)
+ b.counter++
+ }
+ return
+}
+
+func (b *bufferedWriterAt) Flush() (err error) {
+ if b.dataSize == 0 {
+ return nil
+ }
+ // println("flush", b.counter)
+ b.counter = 0
+ _, err = b.writerAt.WriteAt(b.data[0:b.dataSize], b.nextOffset-int64(b.dataSize))
+ if err == nil {
+ b.dataSize = 0
+ }
+ return
+}
+
+// adapted from https://golang.org/src/net/http/httputil/reverseproxy.go
+
+type writeFlusher interface {
+ io.WriterAt
+ Flush() error
+}
+
+type maxLatencyWriterAt struct {
+ dst writeFlusher
+ latency time.Duration // non-zero; negative means to flush immediately
+ mu sync.Mutex // protects t, flushPending, and dst.Flush
+ t *time.Timer
+ flushPending bool
+}
+
+func newMaxLatencyWriterAt(dst writeFlusher, latency time.Duration) *maxLatencyWriterAt {
+ return &maxLatencyWriterAt{
+ dst: dst,
+ latency: latency,
+ }
+}
+
+func (m *maxLatencyWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ n, err = m.dst.WriteAt(p, offset)
+ if m.latency < 0 {
+ m.dst.Flush()
+ return
+ }
+ if m.flushPending {
+ return
+ }
+ if m.t == nil {
+ m.t = time.AfterFunc(m.latency, m.Flush)
+ } else {
+ m.t.Reset(m.latency)
+ }
+ m.flushPending = true
+ return
+}
+
+func (m *maxLatencyWriterAt) Flush() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
+ return
+ }
+ m.dst.Flush()
+ m.flushPending = false
+}
+
+func (m *maxLatencyWriterAt) Close() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.flushPending = false
+ if m.t != nil {
+ m.t.Stop()
+ }
+}
+
+func min(x, y int64) int64 {
+ if x <= y {
+ return x
+ }
+ return y
+}
+func max(x, y int64) int64 {
+ if x <= y {
+ return y
+ }
+ return x
+}