aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-10-30 21:22:20 -0700
committerChris Lu <chris.lu@gmail.com>2020-10-30 21:22:20 -0700
commit8826601be1a5fe563d955b57a51b15d917baa22b (patch)
treedef9ca688158ad773a3398b6c83d62157b7d9b8a
parentbe95f68ca744fb3caa5f7e48aff79bc2b233686e (diff)
downloadseaweedfs-8826601be1a5fe563d955b57a51b15d917baa22b.tar.xz
seaweedfs-8826601be1a5fe563d955b57a51b15d917baa22b.zip
mount: optional limit for the number of concurrent writers
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/filesys/dirty_page.go24
-rw-r--r--weed/filesys/wfs.go8
4 files changed, 25 insertions, 10 deletions
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 7fdb21254..f325cb0a5 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -14,6 +14,7 @@ type MountOptions struct {
replication *string
ttlSec *int
chunkSizeLimitMB *int
+ concurrentWriters *int
cacheDir *string
cacheSizeMB *int64
dataCenter *string
@@ -42,6 +43,7 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
+ mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 20d08314c..649450e54 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -175,6 +175,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
+ ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index dd0c48796..c1b78a220 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -9,12 +9,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- concurrentWriterLimit = runtime.NumCPU()
- concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit)
)
type ContinuousDirtyPages struct {
@@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
f: file,
- chunkSaveErrChan: make(chan error, concurrentWriterLimit),
+ chunkSaveErrChan: make(chan error, runtime.NumCPU()),
}
go func() {
for t := range dirtyPages.chunkSaveErrChan {
@@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
+ errChanSize := pages.f.wfs.option.ConcurrentWriters
+ if errChanSize == 0 {
+ errChanSize = runtime.NumCPU()
+ }
if pages.chunkSaveErrChanClosed {
- pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit)
+ pages.chunkSaveErrChan = make(chan error, errChanSize)
pages.chunkSaveErrChanClosed = false
}
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
- go func() {
+ writer := func() {
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
@@ -121,7 +119,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
pages.collection, pages.replication = collection, replication
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
- }()
+ }
+
+ if pages.f.wfs.concurrentWriters != nil {
+ pages.f.wfs.concurrentWriters.Execute(writer)
+ } else {
+ go writer()
+ }
}
func max(x, y int64) int64 {
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index d31579fce..cd14e8032 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -31,6 +31,7 @@ type Option struct {
Replication string
TtlSec int32
ChunkSizeLimit int64
+ ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
@@ -68,6 +69,9 @@ type WFS struct {
chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
signature int32
+
+ // throttle writers
+ concurrentWriters *util.LimitedConcurrentExecutor
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -110,6 +114,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
wfs.fsNodeCache = newFsCache(wfs.root)
+ if wfs.option.ConcurrentWriters > 0 {
+ wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ }
+
return wfs
}