diff options
Diffstat (limited to 'weed/filesys/page_writer.go')
| -rw-r--r-- | weed/filesys/page_writer.go | 48 |
1 files changed, 15 insertions, 33 deletions
diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 5c06bc44d..c6d08348d 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -6,36 +6,32 @@ import ( ) type PageWriter struct { - f *File + fh *FileHandle collection string replication string chunkSize int64 writerPattern *WriterPattern randomWriter page_writer.DirtyPages - streamWriter page_writer.DirtyPages } var ( _ = page_writer.DirtyPages(&PageWriter{}) ) -func newPageWriter(file *File, chunkSize int64) *PageWriter { +func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { pw := &PageWriter{ - f: file, + fh: fh, chunkSize: chunkSize, writerPattern: NewWriterPattern(chunkSize), - randomWriter: newTempFileDirtyPages(file, chunkSize), - streamWriter: newStreamDirtyPages(file, chunkSize), - //streamWriter: newContinuousDirtyPages(file), - //streamWriter: nil, + randomWriter: newMemoryChunkPages(fh, chunkSize), } return pw } func (pw *PageWriter) AddPage(offset int64, data []byte) { - glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.fh.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { @@ -47,38 +43,22 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) { } func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { - if chunkIndex > 0 { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - pw.streamWriter.AddPage(offset, data) - return - } - } pw.randomWriter.AddPage(offset, data) } func (pw *PageWriter) FlushData() error { pw.writerPattern.Reset() - if pw.streamWriter != nil { - if err := pw.streamWriter.FlushData(); err != nil { - return err - } - } return pw.randomWriter.FlushData() } func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) { - glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.fh.f.fullpath(), offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - if pw.streamWriter != nil { - m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m1) - } - m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - maxStop = max(maxStop, m2) + maxStop = pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) offset += readSize data = data[readSize:] @@ -88,16 +68,18 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { - return pw.streamWriter.GetStorageOptions() - } return pw.randomWriter.GetStorageOptions() } +func (pw *PageWriter) LockForRead(startOffset, stopOffset int64) { + pw.randomWriter.LockForRead(startOffset, stopOffset) +} + +func (pw *PageWriter) UnlockForRead(startOffset, stopOffset int64) { + pw.randomWriter.UnlockForRead(startOffset, stopOffset) +} + func (pw *PageWriter) Destroy() { - if pw.streamWriter != nil { - pw.streamWriter.Destroy() - } pw.randomWriter.Destroy() } |
