diff options
Diffstat (limited to 'weed/mount')
| -rw-r--r-- | weed/mount/filer_conf.go | 118 | ||||
| -rw-r--r-- | weed/mount/weedfs.go | 14 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_io.go | 1 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_mkrm.go | 4 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_sync.go | 8 | ||||
| -rw-r--r-- | weed/mount/weedfs_filehandle.go | 13 | ||||
| -rw-r--r-- | weed/mount/weedfs_link.go | 6 |
7 files changed, 147 insertions, 17 deletions
diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go new file mode 100644 index 000000000..a08d766e8 --- /dev/null +++ b/weed/mount/filer_conf.go @@ -0,0 +1,118 @@ +package mount + +import ( + "fmt" + "path/filepath" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func (wfs *WFS) subscribeFilerConfEvents() (func(), error) { + now := time.Now() + confDir := filer.DirectoryEtcSeaweedFS + confName := filer.FilerConfName + confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName) + + // read current conf + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + content, err := filer.ReadInsideFiler(client, confDir, confName) + if err != nil { + return err + } + + fc := filer.NewFilerConf() + if len(content) > 0 { + if err := fc.LoadFromBytes(content); err != nil { + return fmt.Errorf("parse %s: %v", confFullName, err) + } + } + + wfs.FilerConf = fc + + return nil + }) + if err != nil { + return nil, err + } + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + + dir := resp.Directory + name := resp.EventNotification.NewEntry.Name + + if dir != confDir || name != confName { + return nil + } + + content := message.NewEntry.Content + fc := filer.NewFilerConf() + if len(content) > 0 { + if err = fc.LoadFromBytes(content); err != nil { + return fmt.Errorf("parse %s: %v", confFullName, err) + } + } + + wfs.FilerConf = fc + + return nil + } + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "fuse", + ClientId: wfs.signature, + ClientEpoch: 1, + SelfSignature: 0, + PathPrefix: confFullName, + AdditionalPathPrefixes: nil, + StartTsNs: now.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.FatalOnError, + } + + return func() { + // sync new conf changes + util.RetryUntil("followFilerConfChanges", func() error { + metadataFollowOption.ClientEpoch++ + i := atomic.LoadInt32(&wfs.option.filerIndex) + n := len(wfs.option.FilerAddresses) + err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn) + if err == nil { + atomic.StoreInt32(&wfs.option.filerIndex, i) + return nil + } + + i++ + if i >= int32(n) { + i = 0 + } + + return err + }, func(err error) bool { + glog.V(0).Infof("fuse follow filer conf changes: %v", err) + return true + }) + }, nil +} + +func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool { + if entry == nil || entry.Attributes == nil { + return false + } + + rule := wfs.FilerConf.MatchStorageRule(string(path)) + if !rule.Worm { + return false + } + + return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime +} diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 6a1081113..4f029bba8 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -47,8 +47,6 @@ type Option struct { Quota int64 DisableXAttr bool - WriteOnceReadMany bool - MountUid uint32 MountGid uint32 MountMode os.FileMode @@ -82,6 +80,7 @@ type WFS struct { fuseServer *fuse.Server IsOverQuota bool fhLockTable *util.LockTable[FileHandleId] + FilerConf *filer.FilerConf } func NewSeaweedFileSystem(option *Option) *WFS { @@ -141,10 +140,19 @@ func NewSeaweedFileSystem(option *Option) *WFS { return wfs } -func (wfs *WFS) StartBackgroundTasks() { +func (wfs *WFS) StartBackgroundTasks() error { + fn, err := wfs.subscribeFilerConfEvents() + if err != nil { + return err + } + + go fn() + startTime := time.Now() go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) go wfs.loopCheckQuota() + + return nil } func (wfs *WFS) String() string { diff --git a/weed/mount/weedfs_file_io.go b/weed/mount/weedfs_file_io.go index c6fcb8a14..50a5c7c85 100644 --- a/weed/mount/weedfs_file_io.go +++ b/weed/mount/weedfs_file_io.go @@ -65,6 +65,7 @@ func (wfs *WFS) Open(cancel <-chan struct{}, in *fuse.OpenIn, out *fuse.OpenOut) fileHandle, status = wfs.AcquireHandle(in.NodeId, in.Flags, in.Uid, in.Gid) if status == fuse.OK { out.Fh = uint64(fileHandle.fh) + out.OpenFlags = in.Flags // TODO https://github.com/libfuse/libfuse/blob/master/include/fuse_common.h#L64 } return status diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go index 2da316a50..01d3af476 100644 --- a/weed/mount/weedfs_file_mkrm.go +++ b/weed/mount/weedfs_file_mkrm.go @@ -130,6 +130,10 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin return code } + if wfs.wormEnabledForEntry(entryFullPath, entry) { + return fuse.EPERM + } + // first, ensure the filer store can correctly delete glog.V(3).Infof("remove file: %v", entryFullPath) isDeleteData := entry != nil && entry.HardLinkCounter <= 1 diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go index d857606bd..11ce1d3c4 100644 --- a/weed/mount/weedfs_file_sync.go +++ b/weed/mount/weedfs_file_sync.go @@ -3,13 +3,14 @@ package mount import ( "context" "fmt" + "syscall" + "time" + "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "syscall" - "time" ) /** @@ -128,9 +129,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status { if entry.Attributes.Gid == 0 { entry.Attributes.Gid = gid } - if entry.Attributes.Crtime == 0 { - entry.Attributes.Crtime = time.Now().Unix() - } entry.Attributes.Mtime = time.Now().Unix() } diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go index e0149aee9..bfc129240 100644 --- a/weed/mount/weedfs_filehandle.go +++ b/weed/mount/weedfs_filehandle.go @@ -3,19 +3,16 @@ package mount import ( "github.com/hanwen/go-fuse/v2/fuse" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "time" + "github.com/seaweedfs/seaweedfs/weed/util" ) func (wfs *WFS) AcquireHandle(inode uint64, flags, uid, gid uint32) (fileHandle *FileHandle, status fuse.Status) { var entry *filer_pb.Entry - _, _, entry, status = wfs.maybeReadEntry(inode) + var path util.FullPath + path, _, entry, status = wfs.maybeReadEntry(inode) if status == fuse.OK { - if entry != nil && wfs.option.WriteOnceReadMany { - if entry.Attributes.Mtime+10 < time.Now().Unix() { - if flags&fuse.O_ANYWRITE != 0 { - return nil, fuse.EPERM - } - } + if wfs.wormEnabledForEntry(path, entry) && flags&fuse.O_ANYWRITE != 0 { + return nil, fuse.EPERM } // need to AcquireFileHandle again to ensure correct handle counter fileHandle = wfs.fhMap.AcquireFileHandle(wfs, inode, entry) diff --git a/weed/mount/weedfs_link.go b/weed/mount/weedfs_link.go index 15b7e081e..28466bdfb 100644 --- a/weed/mount/weedfs_link.go +++ b/weed/mount/weedfs_link.go @@ -25,7 +25,6 @@ When creating a link: /** Create a hard link to a file */ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *fuse.EntryOut) (code fuse.Status) { - if wfs.IsOverQuota { return fuse.Status(syscall.ENOSPC) } @@ -49,6 +48,11 @@ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out * return status } + // hardlink is not allowed in WORM mode + if wfs.wormEnabledForEntry(oldEntryPath, oldEntry) { + return fuse.EPERM + } + // update old file to hardlink mode if len(oldEntry.HardLinkId) == 0 { oldEntry.HardLinkId = filer.NewHardLinkId() |
