diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 25 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 14 |
2 files changed, 23 insertions, 16 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index a200050c4..9080b2aef 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,11 +2,17 @@ package filesys import ( "bytes" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" + "runtime" "sync" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var ( + concurrentWriterLimit = runtime.NumCPU() ) type ContinuousDirtyPages struct { @@ -15,17 +21,26 @@ type ContinuousDirtyPages struct { writeWaitGroup sync.WaitGroup chunkSaveErrChan chan error chunkSaveErrChanClosed bool + lastErr error lock sync.Mutex collection string replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { - return &ContinuousDirtyPages{ + dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, 8), + chunkSaveErrChan: make(chan error, concurrentWriterLimit), } + go func() { + for t := range dirtyPages.chunkSaveErrChan { + if t != nil { + dirtyPages.lastErr = t + } + } + }() + return dirtyPages } func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { @@ -105,7 +120,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, chunk.Mtime = mtime pages.collection, pages.replication = collection, replication pages.f.addChunks([]*filer_pb.FileChunk{chunk}) - pages.chunkSaveErrChan <- nil + glog.V(0).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) }() } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 412d7e73f..e3163117c 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -208,25 +208,17 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { fh.dirtyPages.saveExistingPagesToStorage() - var err error - go func() { - for t := range fh.dirtyPages.chunkSaveErrChan { - if t != nil { - err = t - } - } - }() fh.dirtyPages.writeWaitGroup.Wait() - if err != nil { - return err + if fh.dirtyPages.lastErr != nil { + return fh.dirtyPages.lastErr } if !fh.f.dirtyMetadata { return nil } - err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType |
