aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/weedfs.go36
-rw-r--r--weed/mount/weedfs_file_copy_range.go91
2 files changed, 94 insertions, 33 deletions
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 2127e4a2b..849b3ad0c 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -3,11 +3,11 @@ package mount
import (
"context"
"errors"
- "github.com/seaweedfs/seaweedfs/weed/util/version"
"math/rand"
"os"
"path"
"path/filepath"
+ "sync"
"sync/atomic"
"time"
@@ -23,6 +23,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
+ "github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/hanwen/go-fuse/v2/fs"
@@ -71,19 +72,21 @@ type WFS struct {
fuse.RawFileSystem
mount_pb.UnimplementedSeaweedMountServer
fs.Inode
- option *Option
- metaCache *meta_cache.MetaCache
- stats statsCache
- chunkCache *chunk_cache.TieredChunkCache
- signature int32
- concurrentWriters *util.LimitedConcurrentExecutor
- inodeToPath *InodeToPath
- fhMap *FileHandleToInode
- dhMap *DirectoryHandleToInode
- fuseServer *fuse.Server
- IsOverQuota bool
- fhLockTable *util.LockTable[FileHandleId]
- FilerConf *filer.FilerConf
+ option *Option
+ metaCache *meta_cache.MetaCache
+ stats statsCache
+ chunkCache *chunk_cache.TieredChunkCache
+ signature int32
+ concurrentWriters *util.LimitedConcurrentExecutor
+ copyBufferPool sync.Pool
+ concurrentCopiersSem chan struct{}
+ inodeToPath *InodeToPath
+ fhMap *FileHandleToInode
+ dhMap *DirectoryHandleToInode
+ fuseServer *fuse.Server
+ IsOverQuota bool
+ fhLockTable *util.LockTable[FileHandleId]
+ FilerConf *filer.FilerConf
}
func NewSeaweedFileSystem(option *Option) *WFS {
@@ -139,6 +142,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
if wfs.option.ConcurrentWriters > 0 {
wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters)
+ }
+ wfs.copyBufferPool.New = func() any {
+ return make([]byte, option.ChunkSizeLimit)
}
return wfs
}
@@ -183,7 +190,6 @@ func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle
}
func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
-
// glog.V(3).Infof("read entry cache miss %s", fullpath)
dir, name := fullpath.DirAndName()
diff --git a/weed/mount/weedfs_file_copy_range.go b/weed/mount/weedfs_file_copy_range.go
index 43ec289ab..bcf5ae03a 100644
--- a/weed/mount/weedfs_file_copy_range.go
+++ b/weed/mount/weedfs_file_copy_range.go
@@ -1,13 +1,13 @@
package mount
import (
- "github.com/seaweedfs/seaweedfs/weed/util"
"net/http"
"time"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
// CopyFileRange copies data from one file to another from and to specified offsets.
@@ -70,30 +70,85 @@ func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn)
in.OffOut, in.OffOut+in.Len,
)
- data := make([]byte, in.Len)
- totalRead, err := readDataByFileHandle(data, fhIn, int64(in.OffIn))
- if err != nil {
- glog.Warningf("file handle read %s %d: %v", fhIn.FullPath(), totalRead, err)
- return 0, fuse.EIO
+ // Concurrent copy operations could allocate too much memory, so we want to
+ // throttle our concurrency, scaling with the number of writers the mount
+ // was configured with.
+ if wfs.concurrentCopiersSem != nil {
+ wfs.concurrentCopiersSem <- struct{}{}
+ defer func() { <-wfs.concurrentCopiersSem }()
}
- data = data[:totalRead]
- if totalRead == 0 {
+ // We want to stream the copy operation to avoid allocating massive buffers.
+ nowUnixNano := time.Now().UnixNano()
+ totalCopied := int64(0)
+ buff := wfs.copyBufferPool.Get().([]byte)
+ defer wfs.copyBufferPool.Put(buff)
+ for {
+ // Comply with cancellation as best as we can, given that the underlying
+ // IO functions aren't cancellation-aware.
+ select {
+ case <-cancel:
+ glog.Warningf("canceled CopyFileRange for %s (copied %d)",
+ fhIn.FullPath(), totalCopied)
+ return uint32(totalCopied), fuse.EINTR
+ default: // keep going
+ }
+
+ // We can save one IO by breaking early if we already know the next read
+ // will result in zero bytes.
+ remaining := int64(in.Len) - totalCopied
+ readLen := min(remaining, int64(len(buff)))
+ if readLen == 0 {
+ break
+ }
+
+ // Perform the read
+ offsetIn := totalCopied + int64(in.OffIn)
+ numBytesRead, err := readDataByFileHandle(
+ buff[:readLen], fhIn, offsetIn)
+ if err != nil {
+ glog.Warningf("file handle read %s %d (total %d): %v",
+ fhIn.FullPath(), numBytesRead, totalCopied, err)
+ return 0, fuse.EIO
+ }
+
+ // Break if we're done copying (no more bytes to read)
+ if numBytesRead == 0 {
+ break
+ }
+
+ offsetOut := int64(in.OffOut) + totalCopied
+
+ // Detect mime type only during the beginning of our stream, since
+ // DetectContentType is expecting some of the first 512 bytes of the
+ // file. See [http.DetectContentType] for details.
+ if offsetOut <= 512 {
+ fhOut.contentType = http.DetectContentType(buff[:numBytesRead])
+ }
+
+ // Perform the write
+ fhOut.dirtyPages.writerPattern.MonitorWriteAt(offsetOut, int(numBytesRead))
+ fhOut.dirtyPages.AddPage(
+ offsetOut,
+ buff[:numBytesRead],
+ fhOut.dirtyPages.writerPattern.IsSequentialMode(),
+ nowUnixNano)
+
+ // Accumulate for the next loop iteration
+ totalCopied += numBytesRead
+ }
+
+ if totalCopied == 0 {
return 0, fuse.OK
}
- // put data at the specified offset in target file
- fhOut.dirtyPages.writerPattern.MonitorWriteAt(int64(in.OffOut), int(in.Len))
+ fhOut.entry.Attributes.FileSize = uint64(max(
+ totalCopied+int64(in.OffOut),
+ int64(fhOut.entry.Attributes.FileSize),
+ ))
fhOut.entry.Content = nil
- fhOut.dirtyPages.AddPage(int64(in.OffOut), data, fhOut.dirtyPages.writerPattern.IsSequentialMode(), time.Now().UnixNano())
- fhOut.entry.Attributes.FileSize = uint64(max(int64(in.OffOut)+totalRead, int64(fhOut.entry.Attributes.FileSize)))
fhOut.dirtyMetadata = true
- written = uint32(totalRead)
-
- // detect mime type
- if written > 0 && in.OffOut <= 512 {
- fhOut.contentType = http.DetectContentType(data)
- }
+ written = uint32(totalCopied)
return written, fuse.OK
}