aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2021-03-30 02:10:50 -0700
committerChris Lu <chris.lu@uber.com>2021-03-30 02:10:53 -0700
commitac875976c0731597ae3df324e71b7ec2b8b4b83d (patch)
tree44b9f8513d46b9748a4d810b957d27621b9da004 /weed/command
parenta1e18a1384fa9bb6223471a7ac843884056392e2 (diff)
downloadseaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.tar.xz
seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.zip
filer, volume: add concurrent upload size limit to avoid OOM
add some back pressure when writes are slow
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer.go33
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/command/volume.go53
3 files changed, 48 insertions, 40 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 68edb8e69..82994b971 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -47,6 +47,7 @@ type FilerOptions struct {
metricsHttpPort *int
saveToFilerLimit *int
defaultLevelDbDirectory *string
+ concurrentUploadLimitMB *int
}
func init() {
@@ -69,6 +70,7 @@ func init() {
f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
+ f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -159,21 +161,22 @@ func (fo *FilerOptions) startFiler() {
}
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
- Masters: strings.Split(*fo.masters, ","),
- Collection: *fo.collection,
- DefaultReplication: *fo.defaultReplicaPlacement,
- DisableDirListing: *fo.disableDirListing,
- MaxMB: *fo.maxMB,
- DirListingLimit: *fo.dirListingLimit,
- DataCenter: *fo.dataCenter,
- Rack: *fo.rack,
- DefaultLevelDbDir: defaultLevelDbDirectory,
- DisableHttp: *fo.disableHttp,
- Host: *fo.ip,
- Port: uint32(*fo.port),
- Cipher: *fo.cipher,
- SaveToFilerLimit: *fo.saveToFilerLimit,
- Filers: peers,
+ Masters: strings.Split(*fo.masters, ","),
+ Collection: *fo.collection,
+ DefaultReplication: *fo.defaultReplicaPlacement,
+ DisableDirListing: *fo.disableDirListing,
+ MaxMB: *fo.maxMB,
+ DirListingLimit: *fo.dirListingLimit,
+ DataCenter: *fo.dataCenter,
+ Rack: *fo.rack,
+ DefaultLevelDbDir: defaultLevelDbDirectory,
+ DisableHttp: *fo.disableHttp,
+ Host: *fo.ip,
+ Port: uint32(*fo.port),
+ Cipher: *fo.cipher,
+ SaveToFilerLimit: *fo.saveToFilerLimit,
+ Filers: peers,
+ ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/server.go b/weed/command/server.go
index c0ac5ee4c..eef2945fc 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -99,6 +99,7 @@ func init() {
filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers")
filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list")
filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.")
+ filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
@@ -108,6 +109,7 @@ func init() {
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index b75af0f71..8014feaa6 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -35,31 +35,32 @@ var (
)
type VolumeServerOptions struct {
- port *int
- publicPort *int
- folders []string
- folderMaxLimits []int
- idxFolder *string
- ip *string
- publicUrl *string
- bindIp *string
- masters *string
- idleConnectionTimeout *int
- dataCenter *string
- rack *string
- whiteList []string
- indexType *string
- diskType *string
- fixJpgOrientation *bool
- readRedirect *bool
- cpuProfile *string
- memProfile *string
- compactionMBPerSecond *int
- fileSizeLimitMB *int
- minFreeSpacePercents []float32
- pprof *bool
- preStopSeconds *int
- metricsHttpPort *int
+ port *int
+ publicPort *int
+ folders []string
+ folderMaxLimits []int
+ idxFolder *string
+ ip *string
+ publicUrl *string
+ bindIp *string
+ masters *string
+ idleConnectionTimeout *int
+ dataCenter *string
+ rack *string
+ whiteList []string
+ indexType *string
+ diskType *string
+ fixJpgOrientation *bool
+ readRedirect *bool
+ cpuProfile *string
+ memProfile *string
+ compactionMBPerSecond *int
+ fileSizeLimitMB *int
+ concurrentUploadLimitMB *int
+ minFreeSpacePercents []float32
+ pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
enableTcp *bool
}
@@ -85,6 +86,7 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
@@ -237,6 +239,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.fixJpgOrientation, *v.readRedirect,
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
+ int64(*v.concurrentUploadLimitMB) * 1024 * 1024,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)