aboutsummaryrefslogtreecommitdiff
path: root/weed/util/buffered_writer/max_latency_writer.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-02-19 19:06:25 -0800
committerChris Lu <chris.lu@gmail.com>2021-02-19 19:06:25 -0800
commit1d5fc8df4309a3456224f5d6304f8d95e5e11c09 (patch)
treeeb85b60e3c1cd6effd0ae902db08da9ce1d0b9d0 /weed/util/buffered_writer/max_latency_writer.go
parent565f7a6e724cf2dca401bd699a65f5814606ebfe (diff)
downloadseaweedfs-origin/volume_buffered_writes.tar.xz
seaweedfs-origin/volume_buffered_writes.zip
working delayed buffered writesorigin/volume_buffered_writes
but no obvious perf improvements
Diffstat (limited to 'weed/util/buffered_writer/max_latency_writer.go')
-rw-r--r--weed/util/buffered_writer/max_latency_writer.go167
1 files changed, 167 insertions, 0 deletions
diff --git a/weed/util/buffered_writer/max_latency_writer.go b/weed/util/buffered_writer/max_latency_writer.go
new file mode 100644
index 000000000..d95d26ffd
--- /dev/null
+++ b/weed/util/buffered_writer/max_latency_writer.go
@@ -0,0 +1,167 @@
+package buffered_writer
+
+import (
+ "io"
+ "sync"
+ "time"
+)
+
+type TimedWriteBuffer struct {
+ maxLatencyWriterAt *maxLatencyWriterAt
+ bufWriterAt *bufferedWriterAt
+}
+
+func (t *TimedWriteBuffer) ReadAt(p []byte, off int64) (n int, err error) {
+ bufStart := t.bufWriterAt.nextOffset - int64(t.bufWriterAt.dataSize)
+ start := max(bufStart, off)
+ stop := min(t.bufWriterAt.nextOffset, off+int64(len(p)))
+ if start <= stop {
+ n = copy(p, t.bufWriterAt.data[start-bufStart:stop-bufStart])
+ }
+ return
+}
+func (t *TimedWriteBuffer) WriteAt(p []byte, offset int64) (n int, err error) {
+ return t.maxLatencyWriterAt.WriteAt(p, offset)
+}
+func (t *TimedWriteBuffer) Flush() {
+ t.maxLatencyWriterAt.Flush()
+}
+func (t *TimedWriteBuffer) Close() {
+ t.maxLatencyWriterAt.Close()
+}
+
+func NewTimedWriteBuffer(writerAt io.WriterAt, size int, latency time.Duration, currentOffset int64) *TimedWriteBuffer {
+ bufWriterAt := newBufferedWriterAt(writerAt, size, currentOffset)
+ maxLatencyWriterAt := newMaxLatencyWriterAt(bufWriterAt, latency)
+ return &TimedWriteBuffer{
+ bufWriterAt: bufWriterAt,
+ maxLatencyWriterAt: maxLatencyWriterAt,
+ }
+}
+
+type bufferedWriterAt struct {
+ data []byte
+ dataSize int
+ nextOffset int64
+ writerAt io.WriterAt
+ counter int
+}
+
+func newBufferedWriterAt(writerAt io.WriterAt, bufferSize int, currentOffset int64) *bufferedWriterAt {
+ return &bufferedWriterAt{
+ data: make([]byte, bufferSize),
+ nextOffset: currentOffset,
+ dataSize: 0,
+ writerAt: writerAt,
+ }
+}
+
+func (b *bufferedWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
+ if b.nextOffset != offset {
+ println("nextOffset", b.nextOffset, "bufSize", b.dataSize, "offset", offset, "data", len(p))
+ }
+ if b.nextOffset != offset || len(p)+b.dataSize > len(b.data) {
+ if err = b.Flush(); err != nil {
+ return 0, err
+ }
+ }
+ if len(p)+b.dataSize > len(b.data) {
+ n, err = b.writerAt.WriteAt(p, offset)
+ if err == nil {
+ b.nextOffset = offset + int64(n)
+ }
+ } else {
+ n = copy(b.data[b.dataSize:len(p)+b.dataSize], p)
+ b.dataSize += n
+ b.nextOffset += int64(n)
+ b.counter++
+ }
+ return
+}
+
+func (b *bufferedWriterAt) Flush() (err error) {
+ if b.dataSize == 0 {
+ return nil
+ }
+ // println("flush", b.counter)
+ b.counter = 0
+ _, err = b.writerAt.WriteAt(b.data[0:b.dataSize], b.nextOffset-int64(b.dataSize))
+ if err == nil {
+ b.dataSize = 0
+ }
+ return
+}
+
+// adapted from https://golang.org/src/net/http/httputil/reverseproxy.go
+
+type writeFlusher interface {
+ io.WriterAt
+ Flush() error
+}
+
+type maxLatencyWriterAt struct {
+ dst writeFlusher
+ latency time.Duration // non-zero; negative means to flush immediately
+ mu sync.Mutex // protects t, flushPending, and dst.Flush
+ t *time.Timer
+ flushPending bool
+}
+
+func newMaxLatencyWriterAt(dst writeFlusher, latency time.Duration) *maxLatencyWriterAt {
+ return &maxLatencyWriterAt{
+ dst: dst,
+ latency: latency,
+ }
+}
+
+func (m *maxLatencyWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ n, err = m.dst.WriteAt(p, offset)
+ if m.latency < 0 {
+ m.dst.Flush()
+ return
+ }
+ if m.flushPending {
+ return
+ }
+ if m.t == nil {
+ m.t = time.AfterFunc(m.latency, m.Flush)
+ } else {
+ m.t.Reset(m.latency)
+ }
+ m.flushPending = true
+ return
+}
+
+func (m *maxLatencyWriterAt) Flush() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
+ return
+ }
+ m.dst.Flush()
+ m.flushPending = false
+}
+
+func (m *maxLatencyWriterAt) Close() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.flushPending = false
+ if m.t != nil {
+ m.t.Stop()
+ }
+}
+
+func min(x, y int64) int64 {
+ if x <= y {
+ return x
+ }
+ return y
+}
+func max(x, y int64) int64 {
+ if x <= y {
+ return y
+ }
+ return x
+}