diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 27 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval.go | 7 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 23 |
3 files changed, 35 insertions, 22 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index a200050c4..7a3e859f5 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,11 +2,17 @@ package filesys import ( "bytes" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" + "runtime" "sync" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var ( + concurrentWriterLimit = runtime.NumCPU() ) type ContinuousDirtyPages struct { @@ -15,17 +21,26 @@ type ContinuousDirtyPages struct { writeWaitGroup sync.WaitGroup chunkSaveErrChan chan error chunkSaveErrChanClosed bool + lastErr error lock sync.Mutex collection string replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { - return &ContinuousDirtyPages{ + dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, 8), + chunkSaveErrChan: make(chan error, concurrentWriterLimit), } + go func() { + for t := range dirtyPages.chunkSaveErrChan { + if t != nil { + dirtyPages.lastErr = t + } + } + }() + return dirtyPages } func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { @@ -84,7 +99,7 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { if pages.chunkSaveErrChanClosed { - pages.chunkSaveErrChan = make(chan error, 8) + pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit) pages.chunkSaveErrChanClosed = false } @@ -105,7 +120,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.chunkSaveErrChan <- nil + glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) }() } diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index afa2755ed..f143fe3e4 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -3,6 +3,8 @@ package filesys import ( "bytes" "io" + + "github.com/chrislusf/seaweedfs/weed/util" ) type IntervalNode struct { @@ -200,10 +202,13 @@ func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxSto func (l *IntervalLinkedList) ToReader() io.Reader { var readers []io.Reader t := l.Head - readers = append(readers, bytes.NewReader(t.Data)) + readers = append(readers, util.NewBytesReader(t.Data)) for t.Next != nil { t = t.Next readers = append(readers, bytes.NewReader(t.Data)) } + if len(readers) == 1 { + return readers[0] + } return io.MultiReader(readers...) } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 45abfcc5c..e3163117c 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -142,8 +142,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f defer fh.Unlock() // write the request to volume servers - data := make([]byte, len(req.Data)) - copy(data, req.Data) + data := req.Data fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) @@ -186,8 +185,10 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err } // stop the goroutine - fh.dirtyPages.chunkSaveErrChanClosed = true - close(fh.dirtyPages.chunkSaveErrChan) + if !fh.dirtyPages.chunkSaveErrChanClosed { + fh.dirtyPages.chunkSaveErrChanClosed = true + close(fh.dirtyPages.chunkSaveErrChan) + } return nil } @@ -207,25 +208,17 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { fh.dirtyPages.saveExistingPagesToStorage() - var err error - go func() { - for t := range fh.dirtyPages.chunkSaveErrChan { - if t != nil { - err = t - } - } - }() fh.dirtyPages.writeWaitGroup.Wait() - if err != nil { - return err + if fh.dirtyPages.lastErr != nil { + return fh.dirtyPages.lastErr } if !fh.f.dirtyMetadata { return nil } - err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType |
