aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/page_writer/chunked_stream_writer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/page_writer/chunked_stream_writer.go')
-rw-r--r--weed/filesys/page_writer/chunked_stream_writer.go107
1 files changed, 0 insertions, 107 deletions
diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go
index b4314e78f..2f869ddb8 100644
--- a/weed/filesys/page_writer/chunked_stream_writer.go
+++ b/weed/filesys/page_writer/chunked_stream_writer.go
@@ -1,119 +1,12 @@
package page_writer
import (
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
- "sync"
- "sync/atomic"
)
type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func())
-// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode
-type ChunkedStreamWriter struct {
- activeChunks map[LogicChunkIndex]*MemChunk
- activeChunksLock sync.Mutex
- ChunkSize int64
- saveToStorageFn SaveToStorageFunc
- sync.Mutex
-}
-
type MemChunk struct {
buf []byte
usage *ChunkWrittenIntervalList
}
-
-var _ = io.WriterAt(&ChunkedStreamWriter{})
-
-func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter {
- return &ChunkedStreamWriter{
- ChunkSize: chunkSize,
- activeChunks: make(map[LogicChunkIndex]*MemChunk),
- }
-}
-
-func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) {
- cw.saveToStorageFn = saveToStorageFn
-}
-
-func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) {
- cw.Lock()
- defer cw.Unlock()
-
- logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
- offsetRemainder := off % cw.ChunkSize
-
- memChunk, found := cw.activeChunks[logicChunkIndex]
- if !found {
- memChunk = &MemChunk{
- buf: mem.Allocate(int(cw.ChunkSize)),
- usage: newChunkWrittenIntervalList(),
- }
- cw.activeChunks[logicChunkIndex] = memChunk
- }
- n = copy(memChunk.buf[offsetRemainder:], p)
- memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n))
- if memChunk.usage.IsComplete(cw.ChunkSize) {
- if cw.saveToStorageFn != nil {
- cw.saveOneChunk(memChunk, logicChunkIndex)
- delete(cw.activeChunks, logicChunkIndex)
- }
- }
-
- return
-}
-
-func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
- cw.Lock()
- defer cw.Unlock()
-
- logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize)
- memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize
- memChunk, found := cw.activeChunks[logicChunkIndex]
- if !found {
- return
- }
-
- for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
- logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset)
- logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
- if logicStart < logicStop {
- copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
- maxStop = max(maxStop, logicStop)
- }
- }
- return
-}
-
-func (cw *ChunkedStreamWriter) FlushAll() {
- cw.Lock()
- defer cw.Unlock()
- for logicChunkIndex, memChunk := range cw.activeChunks {
- if cw.saveToStorageFn != nil {
- cw.saveOneChunk(memChunk, logicChunkIndex)
- delete(cw.activeChunks, logicChunkIndex)
- }
- }
-}
-
-func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) {
- var referenceCounter = int32(memChunk.usage.size())
- for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next {
- reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset])
- cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() {
- atomic.AddInt32(&referenceCounter, -1)
- if atomic.LoadInt32(&referenceCounter) == 0 {
- mem.Free(memChunk.buf)
- }
- })
- }
-}
-
-// Reset releases used resources
-func (cw *ChunkedStreamWriter) Reset() {
- for t, memChunk := range cw.activeChunks {
- mem.Free(memChunk.buf)
- delete(cw.activeChunks, t)
- }
-}