diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-03-30 02:10:50 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-03-30 02:10:53 -0700 |
| commit | ac875976c0731597ae3df324e71b7ec2b8b4b83d (patch) | |
| tree | 44b9f8513d46b9748a4d810b957d27621b9da004 /weed/command | |
| parent | a1e18a1384fa9bb6223471a7ac843884056392e2 (diff) | |
| download | seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.tar.xz seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.zip | |
filer, volume: add concurrent upload size limit to avoid OOM
add some back pressure when writes are slow
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer.go | 33 | ||||
| -rw-r--r-- | weed/command/server.go | 2 | ||||
| -rw-r--r-- | weed/command/volume.go | 53 |
3 files changed, 48 insertions, 40 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go index 68edb8e69..82994b971 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -47,6 +47,7 @@ type FilerOptions struct { metricsHttpPort *int saveToFilerLimit *int defaultLevelDbDirectory *string + concurrentUploadLimitMB *int } func init() { @@ -69,6 +70,7 @@ func init() { f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store") f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory") + f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -159,21 +161,22 @@ func (fo *FilerOptions) startFiler() { } fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: strings.Split(*fo.masters, ","), - Collection: *fo.collection, - DefaultReplication: *fo.defaultReplicaPlacement, - DisableDirListing: *fo.disableDirListing, - MaxMB: *fo.maxMB, - DirListingLimit: *fo.dirListingLimit, - DataCenter: *fo.dataCenter, - Rack: *fo.rack, - DefaultLevelDbDir: defaultLevelDbDirectory, - DisableHttp: *fo.disableHttp, - Host: *fo.ip, - Port: uint32(*fo.port), - Cipher: *fo.cipher, - SaveToFilerLimit: *fo.saveToFilerLimit, - Filers: peers, + Masters: strings.Split(*fo.masters, ","), + Collection: *fo.collection, + DefaultReplication: *fo.defaultReplicaPlacement, + DisableDirListing: *fo.disableDirListing, + MaxMB: *fo.maxMB, + DirListingLimit: *fo.dirListingLimit, + DataCenter: *fo.dataCenter, + Rack: *fo.rack, + DefaultLevelDbDir: defaultLevelDbDirectory, + DisableHttp: *fo.disableHttp, + Host: *fo.ip, + Port: uint32(*fo.port), + Cipher: *fo.cipher, + SaveToFilerLimit: *fo.saveToFilerLimit, + Filers: peers, + ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/server.go b/weed/command/server.go index c0ac5ee4c..eef2945fc 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -99,6 +99,7 @@ func init() { filerOptions.cipher = cmdServer.Flag.Bool("filer.encryptVolumeData", false, "encrypt data on volume servers") filerOptions.peers = cmdServer.Flag.String("filer.peers", "", "all filers sharing the same filer store in comma separated ip:port list") filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") + filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") @@ -108,6 +109,7 @@ func init() { serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") + serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") diff --git a/weed/command/volume.go b/weed/command/volume.go index b75af0f71..8014feaa6 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -35,31 +35,32 @@ var ( ) type VolumeServerOptions struct { - port *int - publicPort *int - folders []string - folderMaxLimits []int - idxFolder *string - ip *string - publicUrl *string - bindIp *string - masters *string - idleConnectionTimeout *int - dataCenter *string - rack *string - whiteList []string - indexType *string - diskType *string - fixJpgOrientation *bool - readRedirect *bool - cpuProfile *string - memProfile *string - compactionMBPerSecond *int - fileSizeLimitMB *int - minFreeSpacePercents []float32 - pprof *bool - preStopSeconds *int - metricsHttpPort *int + port *int + publicPort *int + folders []string + folderMaxLimits []int + idxFolder *string + ip *string + publicUrl *string + bindIp *string + masters *string + idleConnectionTimeout *int + dataCenter *string + rack *string + whiteList []string + indexType *string + diskType *string + fixJpgOrientation *bool + readRedirect *bool + cpuProfile *string + memProfile *string + compactionMBPerSecond *int + fileSizeLimitMB *int + concurrentUploadLimitMB *int + minFreeSpacePercents []float32 + pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int enableTcp *bool } @@ -85,6 +86,7 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") @@ -237,6 +239,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.fixJpgOrientation, *v.readRedirect, *v.compactionMBPerSecond, *v.fileSizeLimitMB, + int64(*v.concurrentUploadLimitMB) * 1024 * 1024, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) |
