aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-28 04:46:37 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-28 04:46:37 -0800
commitcf252fc0cdbe2e65a1fcb4853ba2d125c47ab1a0 (patch)
tree35ecfb468b5859df107791c78d74ae3c8d4990fb
parent9a06c35da40b9d4b5b687b7300fed4196020f375 (diff)
downloadseaweedfs-cf252fc0cdbe2e65a1fcb4853ba2d125c47ab1a0.tar.xz
seaweedfs-cf252fc0cdbe2e65a1fcb4853ba2d125c47ab1a0.zip
mount: report error when Flush()
The error in Release() is not working. See https://github.com/jaderhs/libfuse/blob/master/FAQ related to https://github.com/chrislusf/seaweedfs/issues/1765
-rw-r--r--weed/filesys/dirty_page.go22
-rw-r--r--weed/filesys/filehandle.go26
2 files changed, 16 insertions, 32 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 11089186f..c4c206a05 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -3,7 +3,6 @@ package filesys
import (
"bytes"
"io"
- "runtime"
"sync"
"time"
@@ -16,8 +15,6 @@ type ContinuousDirtyPages struct {
f *File
writeWaitGroup sync.WaitGroup
chunkAddLock sync.Mutex
- chunkSaveErrChan chan error
- chunkSaveErrChanClosed bool
lastErr error
collection string
replication string
@@ -27,15 +24,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
f: file,
- chunkSaveErrChan: make(chan error, runtime.NumCPU()),
}
- go func() {
- for t := range dirtyPages.chunkSaveErrChan {
- if t != nil {
- dirtyPages.lastErr = t
- }
- }
- }()
return dirtyPages
}
@@ -94,15 +83,6 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
- errChanSize := pages.f.wfs.option.ConcurrentWriters
- if errChanSize == 0 {
- errChanSize = runtime.NumCPU()
- }
- if pages.chunkSaveErrChanClosed {
- pages.chunkSaveErrChan = make(chan error, errChanSize)
- pages.chunkSaveErrChanClosed = false
- }
-
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
writer := func() {
@@ -112,7 +92,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
- pages.chunkSaveErrChan <- err
+ pages.lastErr = err
return
}
chunk.Mtime = mtime
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 168bf792d..85f64d292 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -184,25 +184,20 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.Lock()
defer fh.Unlock()
- fh.f.isOpen--
-
- if fh.f.isOpen < 0 {
+ if fh.f.isOpen <= 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0
return nil
}
- if fh.f.isOpen == 0 {
+ if fh.f.isOpen == 1 {
if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
+ return err
}
- // stop the goroutine
- if !fh.dirtyPages.chunkSaveErrChanClosed {
- fh.dirtyPages.chunkSaveErrChanClosed = true
- close(fh.dirtyPages.chunkSaveErrChan)
- }
+ fh.f.isOpen--
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
if closer, ok := fh.f.reader.(io.Closer); ok {
@@ -216,10 +211,18 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
+ glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
+
fh.Lock()
defer fh.Unlock()
- return fh.doFlush(ctx, req.Header)
+ if err := fh.doFlush(ctx, req.Header); err != nil {
+ glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err)
+ return err
+ }
+
+ glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle)
+ return nil
}
func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
@@ -232,7 +235,8 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
fh.dirtyPages.writeWaitGroup.Wait()
if fh.dirtyPages.lastErr != nil {
- return fh.dirtyPages.lastErr
+ glog.Errorf("%v doFlush last err: %v", fh.f.fullpath(), fh.dirtyPages.lastErr)
+ return fuse.EIO
}
if !fh.f.dirtyMetadata {