diff options
Diffstat (limited to 'weed/filesys/page_writer.go')
| -rw-r--r-- | weed/filesys/page_writer.go | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go index 560ae052d..bdcbc0fbc 100644 --- a/weed/filesys/page_writer.go +++ b/weed/filesys/page_writer.go @@ -24,19 +24,21 @@ func newPageWriter(file *File, chunkSize int64) *PageWriter { pw := &PageWriter{ f: file, chunkSize: chunkSize, + writerPattern: NewWriterPattern(chunkSize), randomWriter: newTempFileDirtyPages(file, chunkSize), - streamWriter: newContinuousDirtyPages(file), - writerPattern: NewWriterPattern(file.Name, chunkSize), + streamWriter: newStreamDirtyPages(file, chunkSize), + //streamWriter: newContinuousDirtyPages(file), + //streamWriter: nil, } return pw } func (pw *PageWriter) AddPage(offset int64, data []byte) { - glog.V(4).Infof("AddPage %v [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) - pw.writerPattern.MonitorWriteAt(offset, len(data)) + glog.V(4).Infof("%v AddPage [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) @@ -48,7 +50,7 @@ func (pw *PageWriter) AddPage(offset int64, data []byte) { func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { if chunkIndex > 0 { - if pw.writerPattern.IsStreamingMode() { + if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { pw.streamWriter.AddPage(offset, data) return } @@ -57,8 +59,11 @@ func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { } func (pw *PageWriter) FlushData() error { - if err := pw.streamWriter.FlushData(); err != nil { - return err + pw.writerPattern.Reset() + if pw.streamWriter != nil { + if err := pw.streamWriter.FlushData(); err != nil { + return err + } } return pw.randomWriter.FlushData() } @@ -70,10 +75,12 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) for i := chunkIndex; len(data) > 0; i++ { readSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) + if pw.streamWriter != nil { + m1 := pw.streamWriter.ReadDirtyDataAt(data[:readSize], offset) + maxStop = max(maxStop, m1) + } m2 := pw.randomWriter.ReadDirtyDataAt(data[:readSize], offset) - - maxStop = max(maxStop, max(m1, m2)) + maxStop = max(maxStop, m2) offset += readSize data = data[readSize:] @@ -83,13 +90,16 @@ func (pw *PageWriter) ReadDirtyDataAt(data []byte, offset int64) (maxStop int64) } func (pw *PageWriter) GetStorageOptions() (collection, replication string) { - if pw.writerPattern.IsStreamingMode() { + if pw.writerPattern.IsStreamingMode() && pw.streamWriter != nil { return pw.streamWriter.GetStorageOptions() } return pw.randomWriter.GetStorageOptions() } func (pw *PageWriter) Destroy() { + if pw.streamWriter != nil { + pw.streamWriter.Destroy() + } pw.randomWriter.Destroy() } |
