diff options
Diffstat (limited to 'weed/filesys/page_writer/chunked_stream_writer.go')
| -rw-r--r-- | weed/filesys/page_writer/chunked_stream_writer.go | 107 |
1 files changed, 0 insertions, 107 deletions
diff --git a/weed/filesys/page_writer/chunked_stream_writer.go b/weed/filesys/page_writer/chunked_stream_writer.go index b4314e78f..2f869ddb8 100644 --- a/weed/filesys/page_writer/chunked_stream_writer.go +++ b/weed/filesys/page_writer/chunked_stream_writer.go @@ -1,119 +1,12 @@ package page_writer import ( - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/mem" "io" - "sync" - "sync/atomic" ) type SaveToStorageFunc func(reader io.Reader, offset int64, size int64, cleanupFn func()) -// ChunkedStreamWriter assumes the write requests will come in within chunks and in streaming mode -type ChunkedStreamWriter struct { - activeChunks map[LogicChunkIndex]*MemChunk - activeChunksLock sync.Mutex - ChunkSize int64 - saveToStorageFn SaveToStorageFunc - sync.Mutex -} - type MemChunk struct { buf []byte usage *ChunkWrittenIntervalList } - -var _ = io.WriterAt(&ChunkedStreamWriter{}) - -func NewChunkedStreamWriter(chunkSize int64) *ChunkedStreamWriter { - return &ChunkedStreamWriter{ - ChunkSize: chunkSize, - activeChunks: make(map[LogicChunkIndex]*MemChunk), - } -} - -func (cw *ChunkedStreamWriter) SetSaveToStorageFunction(saveToStorageFn SaveToStorageFunc) { - cw.saveToStorageFn = saveToStorageFn -} - -func (cw *ChunkedStreamWriter) WriteAt(p []byte, off int64) (n int, err error) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - offsetRemainder := off % cw.ChunkSize - - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - memChunk = &MemChunk{ - buf: mem.Allocate(int(cw.ChunkSize)), - usage: newChunkWrittenIntervalList(), - } - cw.activeChunks[logicChunkIndex] = memChunk - } - n = copy(memChunk.buf[offsetRemainder:], p) - memChunk.usage.MarkWritten(offsetRemainder, offsetRemainder+int64(n)) - if memChunk.usage.IsComplete(cw.ChunkSize) { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } - - return -} - -func (cw *ChunkedStreamWriter) ReadDataAt(p []byte, off int64) (maxStop int64) { - cw.Lock() - defer cw.Unlock() - - logicChunkIndex := LogicChunkIndex(off / cw.ChunkSize) - memChunkBaseOffset := int64(logicChunkIndex) * cw.ChunkSize - memChunk, found := cw.activeChunks[logicChunkIndex] - if !found { - return - } - - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - logicStart := max(off, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset) - logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset) - if logicStart < logicStop { - copy(p[logicStart-off:logicStop-off], memChunk.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset]) - maxStop = max(maxStop, logicStop) - } - } - return -} - -func (cw *ChunkedStreamWriter) FlushAll() { - cw.Lock() - defer cw.Unlock() - for logicChunkIndex, memChunk := range cw.activeChunks { - if cw.saveToStorageFn != nil { - cw.saveOneChunk(memChunk, logicChunkIndex) - delete(cw.activeChunks, logicChunkIndex) - } - } -} - -func (cw *ChunkedStreamWriter) saveOneChunk(memChunk *MemChunk, logicChunkIndex LogicChunkIndex) { - var referenceCounter = int32(memChunk.usage.size()) - for t := memChunk.usage.head.next; t != memChunk.usage.tail; t = t.next { - reader := util.NewBytesReader(memChunk.buf[t.StartOffset:t.stopOffset]) - cw.saveToStorageFn(reader, int64(logicChunkIndex)*cw.ChunkSize+t.StartOffset, t.Size(), func() { - atomic.AddInt32(&referenceCounter, -1) - if atomic.LoadInt32(&referenceCounter) == 0 { - mem.Free(memChunk.buf) - } - }) - } -} - -// Reset releases used resources -func (cw *ChunkedStreamWriter) Reset() { - for t, memChunk := range cw.activeChunks { - mem.Free(memChunk.buf) - delete(cw.activeChunks, t) - } -} |
