diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_continuous.go | 21 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_temp_file.go | 35 | ||||
| -rw-r--r-- | weed/filesys/file.go | 2 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 14 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_page_interval.go (renamed from weed/filesys/dirty_page_interval.go) | 2 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_page_interval_test.go (renamed from weed/filesys/dirty_page_interval_test.go) | 2 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_pages.go (renamed from weed/filesys/dirty_pages.go) | 4 | ||||
| -rw-r--r-- | weed/filesys/page_writer/dirty_pages_temp_interval.go (renamed from weed/filesys/dirty_pages_temp_interval.go) | 55 | ||||
| -rw-r--r-- | weed/filesys/page_writer/writer_pattern.go | 31 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 5 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 4 |
13 files changed, 100 insertions, 79 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 9a791e013..cedcf2d76 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -161,7 +161,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, }, } file.dirtyMetadata = true - fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0) + fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) return file, fh, nil } diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 8a80559f6..1ee6922d8 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -111,7 +111,7 @@ func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR // change file handle inodeId := oldPath.AsInode() dir.wfs.handlesLock.Lock() - if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil { + if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle != nil { glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath) delete(dir.wfs.handles, inodeId) dir.wfs.handles[newPath.AsInode()] = existingHandle diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go index b7514a2eb..88b50ce41 100644 --- a/weed/filesys/dirty_pages_continuous.go +++ b/weed/filesys/dirty_pages_continuous.go @@ -3,6 +3,7 @@ package filesys import ( "bytes" "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "io" "sync" "time" @@ -12,9 +13,8 @@ import ( ) type ContinuousDirtyPages struct { - intervals *ContinuousIntervals + intervals *page_writer.ContinuousIntervals f *File - writeOnly bool writeWaitGroup sync.WaitGroup chunkAddLock sync.Mutex lastErr error @@ -22,11 +22,10 @@ type ContinuousDirtyPages struct { replication string } -func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages { +func newContinuousDirtyPages(file *File) *ContinuousDirtyPages { dirtyPages := &ContinuousDirtyPages{ - intervals: &ContinuousIntervals{}, + intervals: &page_writer.ContinuousIntervals{}, f: file, - writeOnly: writeOnly, } return dirtyPages } @@ -107,7 +106,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) pages.lastErr = err @@ -148,13 +147,3 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6 func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { return pages.collection, pages.replication } - -func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) { - if pages.writeOnly { - pages.writeOnly = writeOnly - } -} - -func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) { - return pages.writeOnly -} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index 9fa7c0c8e..6a22889dc 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -2,6 +2,7 @@ package filesys import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" @@ -13,8 +14,7 @@ import ( type TempFileDirtyPages struct { f *File tf *os.File - writtenIntervals *WrittenContinuousIntervals - writeOnly bool + writtenIntervals *page_writer.WrittenContinuousIntervals writeWaitGroup sync.WaitGroup pageAddLock sync.Mutex chunkAddLock sync.Mutex @@ -23,12 +23,11 @@ type TempFileDirtyPages struct { replication string } -func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages { +func newTempFileDirtyPages(file *File) *TempFileDirtyPages { tempFile := &TempFileDirtyPages{ f: file, - writeOnly: writeOnly, - writtenIntervals: &WrittenContinuousIntervals{}, + writtenIntervals: &page_writer.WrittenContinuousIntervals{}, } return tempFile @@ -47,11 +46,11 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { return } pages.tf = tf - pages.writtenIntervals.tempFile = tf - pages.writtenIntervals.lastOffset = 0 + pages.writtenIntervals.TempFile = tf + pages.writtenIntervals.LastOffset = 0 } - writtenOffset := pages.writtenIntervals.lastOffset + writtenOffset := pages.writtenIntervals.LastOffset dataSize := int64(len(data)) // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) @@ -60,7 +59,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { pages.lastErr = err } else { pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) - pages.writtenIntervals.lastOffset += dataSize + pages.writtenIntervals.LastOffset += dataSize } // pages.writtenIntervals.debug() @@ -79,8 +78,8 @@ func (pages *TempFileDirtyPages) FlushData() error { defer pages.pageAddLock.Unlock() if pages.tf != nil { - pages.writtenIntervals.tempFile = nil - pages.writtenIntervals.lists = nil + pages.writtenIntervals.TempFile = nil + pages.writtenIntervals.Lists = nil pages.tf.Close() os.Remove(pages.tf.Name()) @@ -95,7 +94,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) - for _, list := range pages.writtenIntervals.lists { + for _, list := range pages.writtenIntervals.Lists { listStopOffset := list.Offset() + list.Size() for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) @@ -117,7 +116,7 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) pages.lastErr = err @@ -145,13 +144,3 @@ func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { return pages.collection, pages.replication } - -func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) { - if pages.writeOnly { - pages.writeOnly = writeOnly - } -} - -func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) { - return pages.writeOnly -} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index f8fd7ad99..767841f9d 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -97,7 +97,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0) + handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) resp.Handle = fuse.HandleID(handle.handle) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 34affddb9..232d28667 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,6 +3,7 @@ package filesys import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "io" "math" "net/http" @@ -20,7 +21,7 @@ import ( type FileHandle struct { // cache file has been written to - dirtyPages DirtyPages + dirtyPages page_writer.DirtyPages entryViewCache []filer.VisibleInterval reader io.ReaderAt contentType string @@ -36,11 +37,11 @@ type FileHandle struct { isDeleted bool } -func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { +func newFileHandle(file *File, uid, gid uint32) *FileHandle { fh := &FileHandle{ f: file, // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newTempFileDirtyPages(file, writeOnly), + dirtyPages: newTempFileDirtyPages(file), Uid: uid, Gid: gid, } @@ -62,10 +63,11 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) fh.Lock() defer fh.Unlock() + glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) + if req.Size <= 0 { return nil } @@ -173,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f // write the request to volume servers data := req.Data - if len(data) <= 512 { + if len(data) <= 512 && req.Offset == 0 { // fuse message cacheable size data = make([]byte, len(req.Data)) copy(data, req.Data) @@ -303,7 +305,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) - chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks) + chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/page_writer/dirty_page_interval.go index 304793340..6d73b8cd7 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/page_writer/dirty_page_interval.go @@ -1,4 +1,4 @@ -package filesys +package page_writer import ( "io" diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/page_writer/dirty_page_interval_test.go index d02ad27fd..2a2a1df4d 100644 --- a/weed/filesys/dirty_page_interval_test.go +++ b/weed/filesys/page_writer/dirty_page_interval_test.go @@ -1,4 +1,4 @@ -package filesys +package page_writer import ( "bytes" diff --git a/weed/filesys/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index 8505323ef..c18f847b7 100644 --- a/weed/filesys/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -1,10 +1,8 @@ -package filesys +package page_writer type DirtyPages interface { AddPage(offset int64, data []byte) FlushData() error ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) - SetWriteOnly(writeOnly bool) - GetWriteOnly() (writeOnly bool) } diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/page_writer/dirty_pages_temp_interval.go index 42c4b5a3b..aeaf0ec6f 100644 --- a/weed/filesys/dirty_pages_temp_interval.go +++ b/weed/filesys/page_writer/dirty_pages_temp_interval.go @@ -1,4 +1,4 @@ -package filesys +package page_writer import ( "io" @@ -20,9 +20,9 @@ type WrittenIntervalLinkedList struct { } type WrittenContinuousIntervals struct { - tempFile *os.File - lastOffset int64 - lists []*WrittenIntervalLinkedList + TempFile *os.File + LastOffset int64 + Lists []*WrittenIntervalLinkedList } func (list *WrittenIntervalLinkedList) Offset() int64 { @@ -65,7 +65,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) { } func (c *WrittenContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { + for _, list := range c.Lists { total += list.Size() } return @@ -98,7 +98,7 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv func (c *WrittenContinuousIntervals) debug() { log.Printf("++") - for _, l := range c.lists { + for _, l := range c.Lists { log.Printf("++++") for t := l.Head; ; t = t.Next { log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) @@ -116,8 +116,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] + if len(c.Lists) == 1 { + lastSpan := c.Lists[0] if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset { lastSpan.addNodeToTail(interval) return @@ -125,7 +125,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, } var newLists []*WrittenIntervalLinkedList - for _, list := range c.lists { + for _, list := range c.Lists { // if list is to the left of new interval, add to the new list if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset { newLists = append(newLists, list) @@ -147,18 +147,18 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, // skip anything that is fully overwritten by the new interval } - c.lists = newLists + c.Lists = newLists // add the new interval to the lists, connecting neighbor lists var prevList, nextList *WrittenIntervalLinkedList - for _, list := range c.lists { + for _, list := range c.Lists { if list.Head.DataOffset == interval.DataOffset+interval.Size { nextList = list break } } - for _, list := range c.lists { + for _, list := range c.Lists { if list.Head.DataOffset+list.Size() == dataOffset { list.addNodeToTail(interval) prevList = list @@ -176,8 +176,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, nextList.addNodeToHead(interval) } if prevList == nil && nextList == nil { - c.lists = append(c.lists, &WrittenIntervalLinkedList{ - tempFile: c.tempFile, + c.Lists = append(c.Lists, &WrittenIntervalLinkedList{ + tempFile: c.TempFile, Head: interval, Tail: interval, }) @@ -189,7 +189,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList { var maxSize int64 maxIndex := -1 - for k, list := range c.lists { + for k, list := range c.Lists { if maxSize <= list.Size() { maxSize = list.Size() maxIndex = k @@ -199,16 +199,16 @@ func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenI return nil } - t := c.lists[maxIndex] - t.tempFile = c.tempFile - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) + t := c.Lists[maxIndex] + t.tempFile = c.TempFile + c.Lists = append(c.Lists[0:maxIndex], c.Lists[maxIndex+1:]...) return t } func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) { index := -1 - for k, list := range c.lists { + for k, list := range c.Lists { if list.Offset() == target.Offset() { index = k } @@ -217,12 +217,12 @@ func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedLis return } - c.lists = append(c.lists[0:index], c.lists[index+1:]...) + c.Lists = append(c.Lists[0:index], c.Lists[index+1:]...) } func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { + for _, list := range c.Lists { start := max(startOffset, list.Offset()) stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) if start < stop { @@ -287,3 +287,16 @@ func (f *FileSectionReader) Read(p []byte) (n int, err error) { } return } + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/weed/filesys/page_writer/writer_pattern.go b/weed/filesys/page_writer/writer_pattern.go new file mode 100644 index 000000000..c7641c37f --- /dev/null +++ b/weed/filesys/page_writer/writer_pattern.go @@ -0,0 +1,31 @@ +package page_writer + +type WriterPattern struct { + isStreaming bool + lastWriteOffset int64 +} + +// For streaming write: only cache the first chunk +// For random write: fall back to temp file approach + +func NewWriterPattern() *WriterPattern { + return &WriterPattern{ + isStreaming: true, + lastWriteOffset: 0, + } +} + +func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { + if rp.lastWriteOffset > offset { + rp.isStreaming = false + } + rp.lastWriteOffset = offset +} + +func (rp *WriterPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *WriterPattern) IsRandomMode() bool { + return !rp.isStreaming +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 92f6bae38..aa4f9dacd 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -148,7 +148,7 @@ func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } -func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) { +func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { fullpath := file.fullpath() glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) @@ -160,7 +160,6 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file if found && existingHandle != nil && existingHandle.f.isOpen > 0 { existingHandle.f.isOpen++ wfs.handlesLock.Unlock() - existingHandle.dirtyPages.SetWriteOnly(writeOnly) glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen) return existingHandle } @@ -168,7 +167,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file entry, _ := file.maybeLoadEntry(context.Background()) file.entry = entry - fileHandle = newFileHandle(file, uid, gid, writeOnly) + fileHandle = newFileHandle(file, uid, gid) wfs.handlesLock.Lock() file.isOpen++ diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 3d08cb5e2..61a463e88 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -13,7 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.SaveDataAsChunkFunctionType { +func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { var fileId, host string @@ -74,7 +74,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) } - if !writeOnly { + if offset == 0 { wfs.chunkCache.SetChunk(fileId, data) } |
