diff options
Diffstat (limited to 'weed/util/net_timeout.go')
| -rw-r--r-- | weed/util/net_timeout.go | 119 |
1 files changed, 26 insertions, 93 deletions
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index 75e475f6b..9aeb5cd48 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -9,22 +9,11 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" ) -const ( - // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s) - // Used to calculate timeout scaling based on data transferred - minThroughputBytesPerSecond = 4000 - - // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout - // This prevents indefinite connections while allowing time for server-side chunk fetches - graceTimeCapMultiplier = 3 -) - // Listener wraps a net.Listener, and gives a place to store the timeout // parameters. On Accept, it will wrap the net.Conn with our own Conn for us. type Listener struct { net.Listener - ReadTimeout time.Duration - WriteTimeout time.Duration + Timeout time.Duration } func (l *Listener) Accept() (net.Conn, error) { @@ -34,103 +23,50 @@ func (l *Listener) Accept() (net.Conn, error) { } stats.ConnectionOpen() tc := &Conn{ - Conn: c, - ReadTimeout: l.ReadTimeout, - WriteTimeout: l.WriteTimeout, + Conn: c, + Timeout: l.Timeout, } return tc, nil } -// Conn wraps a net.Conn, and sets a deadline for every read -// and write operation. +// Conn wraps a net.Conn and implements a "no activity timeout". +// Any activity (read or write) resets the deadline, so the connection +// only times out when there's no activity in either direction. type Conn struct { net.Conn - ReadTimeout time.Duration - WriteTimeout time.Duration - isClosed bool - bytesRead int64 - bytesWritten int64 - lastWrite time.Time + Timeout time.Duration + isClosed bool } -// calculateBytesPerTimeout calculates the expected number of bytes that should -// be transferred during one timeout period, based on the minimum throughput. -// Returns at least 1 to prevent division by zero. -func calculateBytesPerTimeout(timeout time.Duration) int64 { - bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds()) - if bytesPerTimeout <= 0 { - return 1 // Prevent division by zero +// extendDeadline extends the connection deadline from now. +// This implements "no activity timeout" - any activity keeps the connection alive. +func (c *Conn) extendDeadline() error { + if c.Timeout > 0 { + return c.Conn.SetDeadline(time.Now().Add(c.Timeout)) } - return bytesPerTimeout + return nil } func (c *Conn) Read(b []byte) (count int, e error) { - if c.ReadTimeout != 0 { - // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) - // Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB - // After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s - bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout) - timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1) - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier)) - if err != nil { - return 0, err - } + // Extend deadline before reading - any activity keeps connection alive + if err := c.extendDeadline(); err != nil { + return 0, err } count, e = c.Conn.Read(b) if e == nil { stats.BytesIn(int64(count)) - c.bytesRead += int64(count) } return } func (c *Conn) Write(b []byte) (count int, e error) { - if c.WriteTimeout != 0 { - now := time.Now() - // Calculate timeout with two components: - // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s) - // 2. Additional grace period if there was a gap since last write (for chunk fetch delays) - - // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) - // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB - // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s - bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout) - timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) - baseTimeout := c.WriteTimeout * timeoutMultiplier - - // If it's been a while since last write, add grace time for server-side chunk fetches - // But cap it to avoid keeping slow clients connected indefinitely - // - // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time - // exceeds base timeout, independent of throughput scaling. - if !c.lastWrite.IsZero() { - timeSinceLastWrite := now.Sub(c.lastWrite) - if timeSinceLastWrite > c.WriteTimeout { - // Add grace time capped at graceTimeCapMultiplier * scaled timeout. - // This allows total deadline up to 4x scaled timeout for server-side delays. - // - // Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s - // If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s - // Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage - // But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s - graceTime := timeSinceLastWrite - if graceTime > baseTimeout*graceTimeCapMultiplier { - graceTime = baseTimeout * graceTimeCapMultiplier - } - baseTimeout += graceTime - } - } - - err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) - if err != nil { - return 0, err - } + // Extend deadline before writing - any activity keeps connection alive + if err := c.extendDeadline(); err != nil { + return 0, err } count, e = c.Conn.Write(b) if e == nil { stats.BytesOut(int64(count)) - c.bytesWritten += int64(count) - c.lastWrite = time.Now() } return } @@ -153,9 +89,8 @@ func NewListener(addr string, timeout time.Duration) (ipListener net.Listener, e } ipListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } return @@ -168,9 +103,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis } ipListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" { @@ -181,9 +115,8 @@ func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipLis } localListener = &Listener{ - Listener: listener, - ReadTimeout: timeout, - WriteTimeout: timeout, + Listener: listener, + Timeout: timeout, } } |
