aboutsummaryrefslogtreecommitdiff
path: root/weed/mount
diff options
context:
space:
mode:
authorGuang Jiong Lou <7991675+27149chen@users.noreply.github.com>2024-09-17 12:02:21 +0800
committerGitHub <noreply@github.com>2024-09-16 21:02:21 -0700
commit6c986e9d70098404832d11d81faf7605b9c742a6 (patch)
treecffcf609c9a316f86a680660ec71c6e041a52996 /weed/mount
parente73ab5c0c49dfbd1870bccbd1e6d694d46fc8d73 (diff)
downloadseaweedfs-6c986e9d70098404832d11d81faf7605b9c742a6.tar.xz
seaweedfs-6c986e9d70098404832d11d81faf7605b9c742a6.zip
improve worm support (#5983)
* improve worm support Signed-off-by: lou <alex1988@outlook.com> * worm mode in filer Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * move to fs configure Signed-off-by: lou <alex1988@outlook.com> * remove flag Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * support worm hardlink Signed-off-by: lou <alex1988@outlook.com> * update after review Signed-off-by: lou <alex1988@outlook.com> * typo Signed-off-by: lou <alex1988@outlook.com> * sync filer conf Signed-off-by: lou <alex1988@outlook.com> --------- Signed-off-by: lou <alex1988@outlook.com>
Diffstat (limited to 'weed/mount')
-rw-r--r--weed/mount/filer_conf.go118
-rw-r--r--weed/mount/weedfs.go14
-rw-r--r--weed/mount/weedfs_file_io.go1
-rw-r--r--weed/mount/weedfs_file_mkrm.go4
-rw-r--r--weed/mount/weedfs_file_sync.go8
-rw-r--r--weed/mount/weedfs_filehandle.go13
-rw-r--r--weed/mount/weedfs_link.go6
7 files changed, 147 insertions, 17 deletions
diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go
new file mode 100644
index 000000000..a08d766e8
--- /dev/null
+++ b/weed/mount/filer_conf.go
@@ -0,0 +1,118 @@
+package mount
+
+import (
+ "fmt"
+ "path/filepath"
+ "sync/atomic"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
+ now := time.Now()
+ confDir := filer.DirectoryEtcSeaweedFS
+ confName := filer.FilerConfName
+ confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
+
+ // read current conf
+ err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ content, err := filer.ReadInsideFiler(client, confDir, confName)
+ if err != nil {
+ return err
+ }
+
+ fc := filer.NewFilerConf()
+ if len(content) > 0 {
+ if err := fc.LoadFromBytes(content); err != nil {
+ return fmt.Errorf("parse %s: %v", confFullName, err)
+ }
+ }
+
+ wfs.FilerConf = fc
+
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.NewEntry == nil {
+ return nil
+ }
+
+ dir := resp.Directory
+ name := resp.EventNotification.NewEntry.Name
+
+ if dir != confDir || name != confName {
+ return nil
+ }
+
+ content := message.NewEntry.Content
+ fc := filer.NewFilerConf()
+ if len(content) > 0 {
+ if err = fc.LoadFromBytes(content); err != nil {
+ return fmt.Errorf("parse %s: %v", confFullName, err)
+ }
+ }
+
+ wfs.FilerConf = fc
+
+ return nil
+ }
+
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "fuse",
+ ClientId: wfs.signature,
+ ClientEpoch: 1,
+ SelfSignature: 0,
+ PathPrefix: confFullName,
+ AdditionalPathPrefixes: nil,
+ StartTsNs: now.UnixNano(),
+ StopTsNs: 0,
+ EventErrorType: pb.FatalOnError,
+ }
+
+ return func() {
+ // sync new conf changes
+ util.RetryUntil("followFilerConfChanges", func() error {
+ metadataFollowOption.ClientEpoch++
+ i := atomic.LoadInt32(&wfs.option.filerIndex)
+ n := len(wfs.option.FilerAddresses)
+ err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
+ if err == nil {
+ atomic.StoreInt32(&wfs.option.filerIndex, i)
+ return nil
+ }
+
+ i++
+ if i >= int32(n) {
+ i = 0
+ }
+
+ return err
+ }, func(err error) bool {
+ glog.V(0).Infof("fuse follow filer conf changes: %v", err)
+ return true
+ })
+ }, nil
+}
+
+func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
+ if entry == nil || entry.Attributes == nil {
+ return false
+ }
+
+ rule := wfs.FilerConf.MatchStorageRule(string(path))
+ if !rule.Worm {
+ return false
+ }
+
+ return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
+}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 6a1081113..4f029bba8 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -47,8 +47,6 @@ type Option struct {
Quota int64
DisableXAttr bool
- WriteOnceReadMany bool
-
MountUid uint32
MountGid uint32
MountMode os.FileMode
@@ -82,6 +80,7 @@ type WFS struct {
fuseServer *fuse.Server
IsOverQuota bool
fhLockTable *util.LockTable[FileHandleId]
+ FilerConf *filer.FilerConf
}
func NewSeaweedFileSystem(option *Option) *WFS {
@@ -141,10 +140,19 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return wfs
}
-func (wfs *WFS) StartBackgroundTasks() {
+func (wfs *WFS) StartBackgroundTasks() error {
+ fn, err := wfs.subscribeFilerConfEvents()
+ if err != nil {
+ return err
+ }
+
+ go fn()
+
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
go wfs.loopCheckQuota()
+
+ return nil
}
func (wfs *WFS) String() string {
diff --git a/weed/mount/weedfs_file_io.go b/weed/mount/weedfs_file_io.go
index c6fcb8a14..50a5c7c85 100644
--- a/weed/mount/weedfs_file_io.go
+++ b/weed/mount/weedfs_file_io.go
@@ -65,6 +65,7 @@ func (wfs *WFS) Open(cancel <-chan struct{}, in *fuse.OpenIn, out *fuse.OpenOut)
fileHandle, status = wfs.AcquireHandle(in.NodeId, in.Flags, in.Uid, in.Gid)
if status == fuse.OK {
out.Fh = uint64(fileHandle.fh)
+ out.OpenFlags = in.Flags
// TODO https://github.com/libfuse/libfuse/blob/master/include/fuse_common.h#L64
}
return status
diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go
index 2da316a50..01d3af476 100644
--- a/weed/mount/weedfs_file_mkrm.go
+++ b/weed/mount/weedfs_file_mkrm.go
@@ -130,6 +130,10 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin
return code
}
+ if wfs.wormEnabledForEntry(entryFullPath, entry) {
+ return fuse.EPERM
+ }
+
// first, ensure the filer store can correctly delete
glog.V(3).Infof("remove file: %v", entryFullPath)
isDeleteData := entry != nil && entry.HardLinkCounter <= 1
diff --git a/weed/mount/weedfs_file_sync.go b/weed/mount/weedfs_file_sync.go
index d857606bd..11ce1d3c4 100644
--- a/weed/mount/weedfs_file_sync.go
+++ b/weed/mount/weedfs_file_sync.go
@@ -3,13 +3,14 @@ package mount
import (
"context"
"fmt"
+ "syscall"
+ "time"
+
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
- "syscall"
- "time"
)
/**
@@ -128,9 +129,6 @@ func (wfs *WFS) doFlush(fh *FileHandle, uid, gid uint32) fuse.Status {
if entry.Attributes.Gid == 0 {
entry.Attributes.Gid = gid
}
- if entry.Attributes.Crtime == 0 {
- entry.Attributes.Crtime = time.Now().Unix()
- }
entry.Attributes.Mtime = time.Now().Unix()
}
diff --git a/weed/mount/weedfs_filehandle.go b/weed/mount/weedfs_filehandle.go
index e0149aee9..bfc129240 100644
--- a/weed/mount/weedfs_filehandle.go
+++ b/weed/mount/weedfs_filehandle.go
@@ -3,19 +3,16 @@ package mount
import (
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "time"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
func (wfs *WFS) AcquireHandle(inode uint64, flags, uid, gid uint32) (fileHandle *FileHandle, status fuse.Status) {
var entry *filer_pb.Entry
- _, _, entry, status = wfs.maybeReadEntry(inode)
+ var path util.FullPath
+ path, _, entry, status = wfs.maybeReadEntry(inode)
if status == fuse.OK {
- if entry != nil && wfs.option.WriteOnceReadMany {
- if entry.Attributes.Mtime+10 < time.Now().Unix() {
- if flags&fuse.O_ANYWRITE != 0 {
- return nil, fuse.EPERM
- }
- }
+ if wfs.wormEnabledForEntry(path, entry) && flags&fuse.O_ANYWRITE != 0 {
+ return nil, fuse.EPERM
}
// need to AcquireFileHandle again to ensure correct handle counter
fileHandle = wfs.fhMap.AcquireFileHandle(wfs, inode, entry)
diff --git a/weed/mount/weedfs_link.go b/weed/mount/weedfs_link.go
index 15b7e081e..28466bdfb 100644
--- a/weed/mount/weedfs_link.go
+++ b/weed/mount/weedfs_link.go
@@ -25,7 +25,6 @@ When creating a link:
/** Create a hard link to a file */
func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *fuse.EntryOut) (code fuse.Status) {
-
if wfs.IsOverQuota {
return fuse.Status(syscall.ENOSPC)
}
@@ -49,6 +48,11 @@ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *
return status
}
+ // hardlink is not allowed in WORM mode
+ if wfs.wormEnabledForEntry(oldEntryPath, oldEntry) {
+ return fuse.EPERM
+ }
+
// update old file to hardlink mode
if len(oldEntry.HardLinkId) == 0 {
oldEntry.HardLinkId = filer.NewHardLinkId()