aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-10-18 09:44:04 +0800
committerGitHub <noreply@github.com>2020-10-18 09:44:04 +0800
commit62af2d961d62f01510d0ad25569f4b82e4809996 (patch)
tree109e26db37bb2365973c70983833da7305b69f34 /weed/filesys
parent5c2e409ffe0e9517e597ebf89e9eb3ba55883f28 (diff)
parentc0ab458671f2859990caab70b59041513d90edac (diff)
downloadseaweedfs-62af2d961d62f01510d0ad25569f4b82e4809996.tar.xz
seaweedfs-62af2d961d62f01510d0ad25569f4b82e4809996.zip
Merge pull request #28 from chrislusf/master
sync
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go30
-rw-r--r--weed/filesys/filehandle.go18
2 files changed, 28 insertions, 20 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 3d3fac184..a200050c4 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -6,16 +6,18 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
"sync"
+ "time"
)
type ContinuousDirtyPages struct {
- intervals *ContinuousIntervals
- f *File
- writeWaitGroup sync.WaitGroup
- chunkSaveErrChan chan error
- lock sync.Mutex
- collection string
- replication string
+ intervals *ContinuousIntervals
+ f *File
+ writeWaitGroup sync.WaitGroup
+ chunkSaveErrChan chan error
+ chunkSaveErrChanClosed bool
+ lock sync.Mutex
+ collection string
+ replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
@@ -81,6 +83,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
+ if pages.chunkSaveErrChanClosed {
+ pages.chunkSaveErrChan = make(chan error, 8)
+ pages.chunkSaveErrChanClosed = false
+ }
+
+ mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
go func() {
defer pages.writeWaitGroup.Done()
@@ -94,19 +102,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
pages.chunkSaveErrChan <- err
return
}
+ chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
pages.chunkSaveErrChan <- nil
}()
}
-func maxUint64(x, y uint64) uint64 {
- if x > y {
- return x
- }
- return y
-}
-
func max(x, y int64) int64 {
if x > y {
return x
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 43991376b..45abfcc5c 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -77,6 +77,10 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
totalRead = max(maxStop-req.Offset, totalRead)
}
+ if err == io.EOF {
+ err = nil
+ }
+
if err != nil {
glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
return fuse.EIO
@@ -122,11 +126,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
totalRead, err := fh.f.reader.ReadAt(buff, offset)
- if err == io.EOF {
- err = nil
- }
-
- if err != nil {
+ if err != nil && err != io.EOF{
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
@@ -179,10 +179,16 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
}
if fh.f.isOpen == 0 {
- fh.doFlush(ctx, req.Header)
+ if err := fh.doFlush(ctx, req.Header); err != nil {
+ glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
+ }
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}
+ // stop the goroutine
+ fh.dirtyPages.chunkSaveErrChanClosed = true
+ close(fh.dirtyPages.chunkSaveErrChan)
+
return nil
}