diff options
Diffstat (limited to 'weed/mount')
| -rw-r--r-- | weed/mount/dirty_pages_chunked.go | 2 | ||||
| -rw-r--r-- | weed/mount/filehandle.go | 29 | ||||
| -rw-r--r-- | weed/mount/filehandle_read.go | 3 | ||||
| -rw-r--r-- | weed/mount/meta_cache/meta_cache_subscribe.go | 2 | ||||
| -rw-r--r-- | weed/mount/weedfs.go | 8 | ||||
| -rw-r--r-- | weed/mount/weedfs_attr.go | 4 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_mkrm.go | 18 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_sync.go | 4 | ||||
| -rw-r--r-- | weed/mount/weedfs_filehandle.go | 1 | ||||
| -rw-r--r-- | weed/mount/weedfs_xattr.go | 32 |
10 files changed, 76 insertions, 27 deletions
diff --git a/weed/mount/dirty_pages_chunked.go b/weed/mount/dirty_pages_chunked.go index e0d764070..52308e0e5 100644 --- a/weed/mount/dirty_pages_chunked.go +++ b/weed/mount/dirty_pages_chunked.go @@ -84,7 +84,7 @@ func (pages *ChunkedDirtyPages) saveChunkedFileIntevalToStorage(reader io.Reader } chunk.Mtime = mtime pages.collection, pages.replication = collection, replication - pages.fh.addChunks([]*filer_pb.FileChunk{chunk}) + pages.fh.AddChunks([]*filer_pb.FileChunk{chunk}) pages.fh.entryViewCache = nil glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size) diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go index 8ce31a078..49918c104 100644 --- a/weed/mount/filehandle.go +++ b/weed/mount/filehandle.go @@ -13,12 +13,12 @@ import ( type FileHandleId uint64 type FileHandle struct { - fh FileHandleId - counter int64 - entry *filer_pb.Entry - chunkAddLock sync.Mutex - inode uint64 - wfs *WFS + fh FileHandleId + counter int64 + entry *filer_pb.Entry + entryLock sync.Mutex + inode uint64 + wfs *WFS // cache file has been written to dirtyMetadata bool @@ -53,7 +53,20 @@ func (fh *FileHandle) FullPath() util.FullPath { return fp } -func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) { +func (fh *FileHandle) GetEntry() *filer_pb.Entry { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + return fh.entry +} +func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + fh.entry = entry +} + +func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() // find the earliest incoming chunk newChunks := chunks @@ -82,10 +95,8 @@ func (fh *FileHandle) addChunks(chunks []*filer_pb.FileChunk) { glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks)) - fh.chunkAddLock.Lock() fh.entry.Chunks = append(fh.entry.Chunks, newChunks...) fh.entryViewCache = nil - fh.chunkAddLock.Unlock() } func (fh *FileHandle) Release() { diff --git a/weed/mount/filehandle_read.go b/weed/mount/filehandle_read.go index 5439b8bfd..88ab8612c 100644 --- a/weed/mount/filehandle_read.go +++ b/weed/mount/filehandle_read.go @@ -25,6 +25,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { fileFullPath := fh.FullPath() + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + entry := fh.entry if entry == nil { return 0, io.EOF diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index c30ec3699..c8ccdd375 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -58,7 +58,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } util.RetryForever("followMetaUpdates", func() error { - return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, true) + return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, 0, selfSignature, processEventFn, pb.FatalOnError) }, func(err error) bool { glog.Errorf("follow metadata updates: %v", err) return true diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 2ab82b3ed..584174202 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -40,6 +40,7 @@ type Option struct { DataCenter string Umask os.FileMode Quota int64 + DisableXAttr bool MountUid uint32 MountGid uint32 @@ -131,10 +132,11 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle } var found bool if fh, found = wfs.fhmap.FindFileHandle(inode); found { - if fh.entry.Attributes == nil { - fh.entry.Attributes = &filer_pb.FuseAttributes{} + entry = fh.GetEntry() + if entry != nil && fh.entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} } - return path, fh, fh.entry, fuse.OK + return path, fh, entry, fuse.OK } entry, status = wfs.maybeLoadEntry(path) return diff --git a/weed/mount/weedfs_attr.go b/weed/mount/weedfs_attr.go index cb935d0e4..be504f5e2 100644 --- a/weed/mount/weedfs_attr.go +++ b/weed/mount/weedfs_attr.go @@ -43,6 +43,10 @@ func (wfs *WFS) SetAttr(cancel <-chan struct{}, input *fuse.SetAttrIn, out *fuse if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } if size, ok := input.GetSize(); ok { glog.V(4).Infof("%v setattr set size=%v chunks=%d", path, size, len(entry.Chunks)) diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index 1f6951b96..86d4c4d41 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -58,16 +58,14 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out Name: name, IsDirectory: false, Attributes: &filer_pb.FuseAttributes{ - Mtime: now, - Crtime: now, - FileMode: uint32(fileMode), - Uid: in.Uid, - Gid: in.Gid, - Collection: wfs.option.Collection, - Replication: wfs.option.Replication, - TtlSec: wfs.option.TtlSec, - Rdev: in.Rdev, - Inode: inode, + Mtime: now, + Crtime: now, + FileMode: uint32(fileMode), + Uid: in.Uid, + Gid: in.Gid, + TtlSec: wfs.option.TtlSec, + Rdev: in.Rdev, + Inode: inode, }, } diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index 1c80329c2..b7fffaaa3 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -118,6 +118,9 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + entry := fh.entry if entry == nil { return nil @@ -136,7 +139,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { entry.Attributes.Crtime = time.Now().Unix() } entry.Attributes.Mtime = time.Now().Unix() - entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions() } request := &filer_pb.CreateEntryRequest{ diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go index 2d06acbb3..d769e51c5 100644 --- a/weed/mount/weedfs_filehandle.go +++ b/weed/mount/weedfs_filehandle.go @@ -9,6 +9,7 @@ func (wfs *WFS) AcquireHandle(inode uint64, uid, gid uint32) (fileHandle *FileHa var entry *filer_pb.Entry _, _, entry, status = wfs.maybeReadEntry(inode) if status == fuse.OK { + // need to AcquireFileHandle again to ensure correct handle counter fileHandle = wfs.fhmap.AcquireFileHandle(wfs, inode, entry) } return diff --git a/weed/mount/weedfs_xattr.go b/weed/mount/weedfs_xattr.go index c85a1b3a1..64cc0f6f0 100644 --- a/weed/mount/weedfs_xattr.go +++ b/weed/mount/weedfs_xattr.go @@ -20,6 +20,10 @@ const ( // with the required buffer size. func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string, dest []byte) (size uint32, code fuse.Status) { + if wfs.option.DisableXAttr { + return 0, fuse.Status(syscall.ENOTSUP) + } + //validate attr name if len(attr) > MAX_XATTR_NAME_SIZE { if runtime.GOOS == "darwin" { @@ -70,6 +74,10 @@ func (wfs *WFS) GetXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr str // attribute does not already exist. func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr string, data []byte) fuse.Status { + if wfs.option.DisableXAttr { + return fuse.Status(syscall.ENOTSUP) + } + if wfs.IsOverQuota { return fuse.Status(syscall.ENOSPC) } @@ -94,10 +102,15 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st } } - path, _, entry, status := wfs.maybeReadEntry(input.NodeId) + path, fh, entry, status := wfs.maybeReadEntry(input.NodeId) if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } + if entry.Extended == nil { entry.Extended = make(map[string][]byte) } @@ -122,6 +135,11 @@ func (wfs *WFS) SetXAttr(cancel <-chan struct{}, input *fuse.SetXAttrIn, attr st // slice, and return the number of bytes. If the buffer is too // small, return ERANGE, with the required buffer size. func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest []byte) (n uint32, code fuse.Status) { + + if wfs.option.DisableXAttr { + return 0, fuse.Status(syscall.ENOTSUP) + } + _, _, entry, status := wfs.maybeReadEntry(header.NodeId) if status != fuse.OK { return 0, status @@ -151,13 +169,23 @@ func (wfs *WFS) ListXAttr(cancel <-chan struct{}, header *fuse.InHeader, dest [] // RemoveXAttr removes an extended attribute. func (wfs *WFS) RemoveXAttr(cancel <-chan struct{}, header *fuse.InHeader, attr string) fuse.Status { + + if wfs.option.DisableXAttr { + return fuse.Status(syscall.ENOTSUP) + } + if len(attr) == 0 { return fuse.EINVAL } - path, _, entry, status := wfs.maybeReadEntry(header.NodeId) + path, fh, entry, status := wfs.maybeReadEntry(header.NodeId) if status != fuse.OK { return status } + if fh != nil { + fh.entryLock.Lock() + defer fh.entryLock.Unlock() + } + if entry.Extended == nil { return fuse.ENOATTR } |
