aboutsummaryrefslogtreecommitdiff
path: root/weed/mount
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount')
-rw-r--r--weed/mount/dirty_pages_chunked.go2
-rw-r--r--weed/mount/filehandle.go29
-rw-r--r--weed/mount/filehandle_read.go3
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go2
-rw-r--r--weed/mount/weedfs.go8
-rw-r--r--weed/mount/weedfs_attr.go4
-rw-r--r--weed/mount/weedfs_file_mkrm.go18
-rw-r--r--weed/mount/weedfs_file_sync.go4
-rw-r--r--weed/mount/weedfs_filehandle.go1
-rw-r--r--weed/mount/weedfs_xattr.go32
10 files changed, 76 insertions, 27 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go
index e0d764070..52308e0e5 100644
--- a/weed/mount/dirty_pages_chunked.go
+++ b/weed/mount/dirty_pages_chunked.go
@@ -84,7 +84,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
- pages.fh.addChunks([]*filer_pb.FileChunk{chunk})
+ pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
pages.fh.entryViewCache = nil
glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index 8ce31a078..49918c104 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -13,12 +13,12 @@ import (
type FileHandleId uint64
type FileHandle struct {
- fh FileHandleId
- counter int64
- entry *filer_pb.Entry
- chunkAddLock sync.Mutex
- inode uint64
- wfs *WFS
+ fh FileHandleId
+ counter int64
+ entry *filer_pb.Entry
+ entryLock sync.Mutex
+ inode uint64
+ wfs *WFS
// cache file has been written to
dirtyMetadata bool
@@ -53,7 +53,20 @@ func (fh *FileHandle) FullPath() util.FullPath {
return fp
}
-func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
+func (fh *FileHandle) GetEntry() *filer_pb.Entry {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ return fh.entry
+}
+func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ fh.entry = entry
+}
+
+func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
// find the earliest incoming chunk
newChunks := chunks
@@ -82,10 +95,8 @@ func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) {
glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
- fh.chunkAddLock.Lock()
fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
fh.entryViewCache = nil
- fh.chunkAddLock.Unlock()
}
func (fh *FileHandle) Release() {
diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go
index 5439b8bfd..88ab8612c 100644
--- a/weed/mount/filehandle_read.go
+++ b/weed/mount/filehandle_read.go
@@ -25,6 +25,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
fileFullPath := fh.FullPath()
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+
entry := fh.entry
if entry == nil {
return 0, io.EOF
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
index c30ec3699..c8ccdd375 100644
--- a/weed/mount/meta_cache/meta_cache_subscribe.go
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -58,7 +58,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
util.RetryForever("followMetaUpdates", func() error {
- return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, true)
+ return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, pb.FatalOnError)
}, func(err error) bool {
glog.Errorf("follow metadata updates: %v", err)
return true
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 2ab82b3ed..584174202 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -40,6 +40,7 @@ type Option struct {
DataCenter string
Umask os.FileMode
Quota int64
+ DisableXAttr bool
MountUid uint32
MountGid uint32
@@ -131,10 +132,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
}
var found bool
if fh, found = wfs.fhmap.FindFileHandle(inode); found {
- if fh.entry.Attributes == nil {
- fh.entry.Attributes = &filer_pb.FuseAttributes{}
+ entry = fh.GetEntry()
+ if entry != nil && fh.entry.Attributes == nil {
+ entry.Attributes = &filer_pb.FuseAttributes{}
}
- return path, fh, fh.entry, fuse.OK
+ return path, fh, entry, fuse.OK
}
entry, status = wfs.maybeLoadEntry(path)
return
diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go
index cb935d0e4..be504f5e2 100644
--- a/weed/mount/weedfs_attr.go
+++ b/weed/mount/weedfs_attr.go
@@ -43,6 +43,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse
if status != fuse.OK {
return status
}
+ if fh != nil {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ }
if size, ok := input.GetSize(); ok {
glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks))
diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go
index 1f6951b96..86d4c4d41 100644
--- a/weed/mount/weedfs_file_mkrm.go
+++ b/weed/mount/weedfs_file_mkrm.go
@@ -58,16 +58,14 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
- Mtime: now,
- Crtime: now,
- FileMode: uint32(fileMode),
- Uid: in.Uid,
- Gid: in.Gid,
- Collection: wfs.option.Collection,
- Replication: wfs.option.Replication,
- TtlSec: wfs.option.TtlSec,
- Rdev: in.Rdev,
- Inode: inode,
+ Mtime: now,
+ Crtime: now,
+ FileMode: uint32(fileMode),
+ Uid: in.Uid,
+ Gid: in.Gid,
+ TtlSec: wfs.option.TtlSec,
+ Rdev: in.Rdev,
+ Inode: inode,
},
}
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index 1c80329c2..b7fffaaa3 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+
entry := fh.entry
if entry == nil {
return nil
@@ -136,7 +139,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
entry.Attributes.Crtime = time.Now().Unix()
}
entry.Attributes.Mtime = time.Now().Unix()
- entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
}
request := &filer_pb.CreateEntryRequest{
diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go
index 2d06acbb3..d769e51c5 100644
--- a/weed/mount/weedfs_filehandle.go
+++ b/weed/mount/weedfs_filehandle.go
@@ -9,6 +9,7 @@ func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHa
var entry *filer_pb.Entry
_, _, entry, status = wfs.maybeReadEntry(inode)
if status == fuse.OK {
+ // need to AcquireFileHandle again to ensure correct handle counter
fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry)
}
return
diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go
index c85a1b3a1..64cc0f6f0 100644
--- a/weed/mount/weedfs_xattr.go
+++ b/weed/mount/weedfs_xattr.go
@@ -20,6 +20,10 @@ const (
// with the required buffer size.
func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string, dest []byte) (size uint32, code fuse.Status) {
+ if wfs.option.DisableXAttr {
+ return 0, fuse.Status(syscall.ENOTSUP)
+ }
+
//validate attr name
if len(attr) > MAX_XATTR_NAME_SIZE {
if runtime.GOOS == "darwin" {
@@ -70,6 +74,10 @@ func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr str
// attribute does not already exist.
func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr string, data []byte) fuse.Status {
+ if wfs.option.DisableXAttr {
+ return fuse.Status(syscall.ENOTSUP)
+ }
+
if wfs.IsOverQuota {
return fuse.Status(syscall.ENOSPC)
}
@@ -94,10 +102,15 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
}
}
- path, _, entry, status := wfs.maybeReadEntry(input.NodeId)
+ path, fh, entry, status := wfs.maybeReadEntry(input.NodeId)
if status != fuse.OK {
return status
}
+ if fh != nil {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ }
+
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
@@ -122,6 +135,11 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st
// slice, and return the number of bytes. If the buffer is too
// small, return ERANGE, with the required buffer size.
func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) {
+
+ if wfs.option.DisableXAttr {
+ return 0, fuse.Status(syscall.ENOTSUP)
+ }
+
_, _, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return 0, status
@@ -151,13 +169,23 @@ func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []
// RemoveXAttr removes an extended attribute.
func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string) fuse.Status {
+
+ if wfs.option.DisableXAttr {
+ return fuse.Status(syscall.ENOTSUP)
+ }
+
if len(attr) == 0 {
return fuse.EINVAL
}
- path, _, entry, status := wfs.maybeReadEntry(header.NodeId)
+ path, fh, entry, status := wfs.maybeReadEntry(header.NodeId)
if status != fuse.OK {
return status
}
+ if fh != nil {
+ fh.entryLock.Lock()
+ defer fh.entryLock.Unlock()
+ }
+
if entry.Extended == nil {
return fuse.ENOATTR
}