aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/mount.go6
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/mount/weedfs.go5
-rw-r--r--weed/mount/weedfs_metadata_flush.go168
4 files changed, 180 insertions, 0 deletions
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 618bbd3ae..11e6e4c7c 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -37,6 +37,9 @@ type MountOptions struct {
extraOptions []string
fuseCommandPid int
+ // Periodic metadata flush to protect against orphan chunk cleanup
+ metadataFlushSeconds *int
+
// RDMA acceleration options
rdmaEnabled *bool
rdmaSidecarAddr *string
@@ -85,6 +88,9 @@ func init() {
mountOptions.disableXAttr = cmdMount.Flag.Bool("disableXAttr", false, "disable xattr")
mountOptions.fuseCommandPid = 0
+ // Periodic metadata flush to protect against orphan chunk cleanup
+ mountOptions.metadataFlushSeconds = cmdMount.Flag.Int("metadataFlushSeconds", 120, "periodically flush file metadata to filer in seconds (0 to disable). This protects chunks from being purged by volume.fsck for long-running writes")
+
// RDMA acceleration flags
mountOptions.rdmaEnabled = cmdMount.Flag.Bool("rdma.enabled", false, "enable RDMA acceleration for reads")
mountOptions.rdmaSidecarAddr = cmdMount.Flag.String("rdma.sidecar", "", "RDMA sidecar address (e.g., localhost:8081)")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index d1593454e..de2470017 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -254,6 +254,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
UidGidMapper: uidGidMapper,
DisableXAttr: *option.disableXAttr,
IsMacOs: runtime.GOOS == "darwin",
+ MetadataFlushSeconds: *option.metadataFlushSeconds,
// RDMA acceleration options
RdmaEnabled: *option.rdmaEnabled,
RdmaSidecarAddr: *option.rdmaSidecarAddr,
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 80e062c60..60e20f7e5 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -64,6 +64,10 @@ type Option struct {
Cipher bool // whether encrypt data on volume server
UidGidMapper *meta_cache.UidGidMapper
+ // Periodic metadata flush interval in seconds (0 to disable)
+ // This protects chunks from being purged by volume.fsck for long-running writes
+ MetadataFlushSeconds int
+
// RDMA acceleration options
RdmaEnabled bool
RdmaSidecarAddr string
@@ -214,6 +218,7 @@ func (wfs *WFS) StartBackgroundTasks() error {
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
go wfs.loopCheckQuota()
+ go wfs.loopFlushDirtyMetadata()
return nil
}
diff --git a/weed/mount/weedfs_metadata_flush.go b/weed/mount/weedfs_metadata_flush.go
new file mode 100644
index 000000000..456569792
--- /dev/null
+++ b/weed/mount/weedfs_metadata_flush.go
@@ -0,0 +1,168 @@
+package mount
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// loopFlushDirtyMetadata periodically flushes dirty file metadata to the filer.
+// This protects newly uploaded chunks from being purged by volume.fsck orphan cleanup
+// for files that remain open for extended periods without being closed.
+//
+// The problem: When a file is opened and written to continuously, chunks are uploaded
+// to volume servers but the file metadata (containing chunk references) is only saved
+// to the filer on file close or fsync. If volume.fsck runs during this window, it may
+// identify these chunks as orphans (since they're not referenced in filer metadata)
+// and purge them.
+//
+// This background task periodically flushes metadata for open files, ensuring chunk
+// references are visible to volume.fsck even before files are closed.
+func (wfs *WFS) loopFlushDirtyMetadata() {
+ if wfs.option.MetadataFlushSeconds <= 0 {
+ glog.V(0).Infof("periodic metadata flush disabled")
+ return
+ }
+
+ flushInterval := time.Duration(wfs.option.MetadataFlushSeconds) * time.Second
+ glog.V(0).Infof("periodic metadata flush enabled, interval: %v", flushInterval)
+
+ ticker := time.NewTicker(flushInterval)
+ defer ticker.Stop()
+
+ for range ticker.C {
+ wfs.flushAllDirtyMetadata()
+ }
+}
+
+// flushAllDirtyMetadata iterates through all open file handles and flushes
+// metadata for files that have dirty metadata (chunks uploaded but not yet persisted).
+func (wfs *WFS) flushAllDirtyMetadata() {
+ // Collect file handles with dirty metadata under a read lock
+ var dirtyHandles []*FileHandle
+ wfs.fhMap.RLock()
+ for _, fh := range wfs.fhMap.inode2fh {
+ if fh.dirtyMetadata {
+ dirtyHandles = append(dirtyHandles, fh)
+ }
+ }
+ wfs.fhMap.RUnlock()
+
+ if len(dirtyHandles) == 0 {
+ return
+ }
+
+ glog.V(3).Infof("flushing metadata for %d open files", len(dirtyHandles))
+
+ // Process dirty handles in parallel with limited concurrency
+ var wg sync.WaitGroup
+ concurrency := wfs.option.ConcurrentWriters
+ if concurrency <= 0 {
+ concurrency = 16
+ }
+ sem := make(chan struct{}, concurrency)
+
+ for _, fh := range dirtyHandles {
+ wg.Add(1)
+ sem <- struct{}{}
+ go func(handle *FileHandle) {
+ defer wg.Done()
+ defer func() { <-sem }()
+ if err := wfs.flushFileMetadata(handle); err != nil {
+ glog.Warningf("failed to flush metadata for %s: %v", handle.FullPath(), err)
+ }
+ }(fh)
+ }
+ wg.Wait()
+}
+
+// flushFileMetadata flushes the current file metadata to the filer without
+// flushing dirty pages from memory. This updates chunk references in the filer
+// so volume.fsck can see them, while keeping data in the write buffer.
+func (wfs *WFS) flushFileMetadata(fh *FileHandle) error {
+ // Acquire exclusive lock on the file handle
+ fhActiveLock := fh.wfs.fhLockTable.AcquireLock("flushMetadata", fh.fh, util.ExclusiveLock)
+ defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
+
+ // Double-check dirty flag under lock
+ if !fh.dirtyMetadata {
+ return nil
+ }
+
+ fileFullPath := fh.FullPath()
+ dir, name := fileFullPath.DirAndName()
+
+ glog.V(4).Infof("flushFileMetadata %s fh %d", fileFullPath, fh.fh)
+
+ err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ entry := fh.GetEntry()
+ if entry == nil {
+ return nil
+ }
+ entry.Name = name
+
+ if entry.Attributes != nil {
+ entry.Attributes.Mtime = time.Now().Unix()
+ }
+
+ // Get current chunks - these include chunks that have been uploaded
+ // but not yet persisted to filer metadata
+ chunks := entry.GetChunks()
+ if len(chunks) == 0 {
+ return nil
+ }
+
+ // Separate manifest and non-manifest chunks
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(chunks)
+
+ // Compact chunks to remove fully overlapped ones
+ compactedChunks, _ := filer.CompactFileChunks(context.Background(), wfs.LookupFn(), nonManifestChunks)
+
+ // Try to create manifest chunks for large files
+ compactedChunks, manifestErr := filer.MaybeManifestize(wfs.saveDataAsChunk(fileFullPath), compactedChunks)
+ if manifestErr != nil {
+ glog.V(0).Infof("flushFileMetadata MaybeManifestize: %v", manifestErr)
+ }
+
+ entry.Chunks = append(compactedChunks, manifestChunks...)
+
+ request := &filer_pb.CreateEntryRequest{
+ Directory: string(dir),
+ Entry: entry.GetEntry(),
+ Signatures: []int32{wfs.signature},
+ SkipCheckParentDirectory: true,
+ }
+
+ wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.CreateEntry(context.Background(), client, request); err != nil {
+ return err
+ }
+
+ // Update meta cache
+ if err := wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ return fmt.Errorf("update meta cache for %s: %w", fileFullPath, err)
+ }
+
+ glog.V(3).Infof("flushed metadata for %s with %d chunks", fileFullPath, len(entry.GetChunks()))
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // Note: We do NOT clear dirtyMetadata here because:
+ // 1. There may still be dirty pages in the write buffer
+ // 2. The file may receive more writes before close
+ // 3. dirtyMetadata will be cleared on the final flush when the file is closed
+
+ return nil
+}