aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/dirty_pages_chunked.go2
-rw-r--r--weed/mount/filehandle.go29
-rw-r--r--weed/mount/filehandle_read.go3
-rw-r--r--weed/mount/weedfs.go7
-rw-r--r--weed/mount/weedfs_attr.go4
-rw-r--r--weed/mount/weedfs_file_sync.go3
-rw-r--r--weed/mount/weedfs_xattr.go14
7 files changed, 47 insertions, 15 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index e0d764070..52308e0e5 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -84,7 +84,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
- pages.fh.addChunks([]*filer_pb.FileChunk{chunk})
+ pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryViewCache = nil
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 8ce31a078..49918c104 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -13,12 +13,12 @@ import (
type FileHandleId uint64
type FileHandle struct {
- fh FileHandleId
- counter int64
- entry *filer_pb.Entry
- chunkAddLock sync.Mutex
- inode uint64
- wfs *WFS
+ fh FileHandleId
+ counter int64
+ entry *filer_pb.Entry
+ entryLock sync.Mutex
+ inode uint64
+ wfs *WFS
// cache file has been written to
dirtyMetadata bool
@@ -53,7 +53,20 @@ func (fh *FileHandle) FullPath() util.FullPath {
return fp
}
-func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
+func (fh *FileHandle) GetEntry() *filer_pb.Entry {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ return fh.entry
+}
+func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ fh.entry = entry
+}
+
+func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
// find the earliest incoming chunk
newChunks := chunks
@@ -82,10 +95,8 @@ func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
- fh.chunkAddLock.Lock()
fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
fh.entryViewCache = nil
- fh.chunkAddLock.Unlock()
}
func (fh *FileHandle) Release() {
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index 5439b8bfd..88ab8612c 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -25,6 +25,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileFullPath := fh.FullPath()
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+
entry := fh.entry
if entry == nil {
return 0, io.EOF
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 2ab82b3ed..6437499bf 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -131,10 +131,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
}
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
- if fh.entry.Attributes == nil {
- fh.entry.Attributes = &filer_pb.FuseAttributes{}
+ entry = fh.GetEntry()
+ if entry != nil && fh.entry.Attributes == nil {
+ entry.Attributes = &filer_pb.FuseAttributes{}
}
- return path, fh, fh.entry, fuse.OK
+ return path, fh, entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path)
return
diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go
index cb935d0e4..be504f5e2 100644
--- a/weed/mount/weedfs_attr.go
+++ b/weed/mount/weedfs_attr.go
@@ -43,6 +43,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
if status != fuse.OK {
return status
}
+ if fh != nil {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ }
if size, ok := input.GetSize(); ok {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks))
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 1c80329c2..f3536ddce 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+
entry := fh.entry
if entry == nil {
return nil
diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go
index c85a1b3a1..2fb2b0741 100644
--- a/weed/mount/weedfs_xattr.go
+++ b/weed/mount/weedfs_xattr.go
@@ -94,10 +94,15 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
}
}
- path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
+ path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
return status
}
+ if fh != nil {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ }
+
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
@@ -154,10 +159,15 @@ func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr
if len(attr) == 0 {
return fuse.EINVAL
}
- path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
+ path, fh, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return status
}
+ if fh != nil {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ }
+
if entry.Extended == nil {
return fuse.ENOATTR
}