aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-10-30 16:43:29 -0700
committerGitHub <noreply@github.com>2025-10-30 16:43:29 -0700
commitba07b3e4c68f34219f3ca10f8f3ddcc059ffb972 (patch)
tree34d5d285d3629bb33b66d7c5bbef7dfa7da558c3 /weed/util
parentf5a57a6463428a7c07179ff445a7853112bee6a0 (diff)
downloadseaweedfs-ba07b3e4c68f34219f3ca10f8f3ddcc059ffb972.tar.xz
seaweedfs-ba07b3e4c68f34219f3ca10f8f3ddcc059ffb972.zip
network: Adaptive timeout (#7410)
* server can start when no network for local dev * fixed superfluous response.WriteHeader call" warning * adaptive based on last write time * more doc * refactoring
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/net_timeout.go71
1 files changed, 67 insertions, 4 deletions
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
index f235a77b3..313d7f849 100644
--- a/weed/util/net_timeout.go
+++ b/weed/util/net_timeout.go
@@ -1,13 +1,24 @@
package util
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
"net"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+
"github.com/seaweedfs/seaweedfs/weed/stats"
)
+const (
+ // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
+ // Used to calculate timeout scaling based on data transferred
+ minThroughputBytesPerSecond = 4000
+
+ // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
+ // This prevents indefinite connections while allowing time for server-side chunk fetches
+ graceTimeCapMultiplier = 3
+)
+
// Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
type Listener struct {
@@ -39,11 +50,28 @@ type Conn struct {
isClosed bool
bytesRead int64
bytesWritten int64
+ lastWrite time.Time
+}
+
+// calculateBytesPerTimeout calculates the expected number of bytes that should
+// be transferred during one timeout period, based on the minimum throughput.
+// Returns at least 1 to prevent division by zero.
+func calculateBytesPerTimeout(timeout time.Duration) int64 {
+ bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds())
+ if bytesPerTimeout <= 0 {
+ return 1 // Prevent division by zero
+ }
+ return bytesPerTimeout
}
func (c *Conn) Read(b []byte) (count int, e error) {
if c.ReadTimeout != 0 {
- err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1)))
+ // Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
+ // Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
+ // After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s
+ bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout)
+ timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1)
+ err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier))
if err != nil {
return 0, err
}
@@ -58,8 +86,42 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
- // minimum 4KB/s
- err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(c.bytesWritten/40000+1)))
+ now := time.Now()
+ // Calculate timeout with two components:
+ // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
+ // 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
+
+ // Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
+ // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
+ // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
+ bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout)
+ timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1)
+ baseTimeout := c.WriteTimeout * timeoutMultiplier
+
+ // If it's been a while since last write, add grace time for server-side chunk fetches
+ // But cap it to avoid keeping slow clients connected indefinitely
+ //
+ // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
+ // exceeds base timeout, independent of throughput scaling.
+ if !c.lastWrite.IsZero() {
+ timeSinceLastWrite := now.Sub(c.lastWrite)
+ if timeSinceLastWrite > c.WriteTimeout {
+ // Add grace time capped at graceTimeCapMultiplier * scaled timeout.
+ // This allows total deadline up to 4x scaled timeout for server-side delays.
+ //
+ // Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s
+ // If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s
+ // Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage
+ // But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s
+ graceTime := timeSinceLastWrite
+ if graceTime > baseTimeout*graceTimeCapMultiplier {
+ graceTime = baseTimeout * graceTimeCapMultiplier
+ }
+ baseTimeout += graceTime
+ }
+ }
+
+ err := c.Conn.SetWriteDeadline(now.Add(baseTimeout))
if err != nil {
return 0, err
}
@@ -68,6 +130,7 @@ func (c *Conn) Write(b []byte) (count int, e error) {
if e == nil {
stats.BytesOut(int64(count))
c.bytesWritten += int64(count)
+ c.lastWrite = time.Now()
}
return
}