aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/client.go
diff options
context:
space:
mode:
authorMariano Ntrougkas <44480600+marios1861@users.noreply.github.com>2025-10-24 03:09:46 +0300
committerGitHub <noreply@github.com>2025-10-23 17:09:46 -0700
commitf06ddd05cc148e2fc9ea60566ca4f6346e1f2762 (patch)
tree22c3b1ced2b78778e1a2c1eca8e1d1553e7cf957 /weed/worker/client.go
parent557aa4ec097267ff31e34a5c2fdd7b256e42d1be (diff)
downloadseaweedfs-f06ddd05cc148e2fc9ea60566ca4f6346e1f2762.tar.xz
seaweedfs-f06ddd05cc148e2fc9ea60566ca4f6346e1f2762.zip
Improve-worker (#7367)
* ♻️ refactor(worker): remove goto * ♻️ refactor(worker): let manager loop exit by itself * ♻️ refactor(worker): fix race condition when closing worker CloseSend is not safe to call when another goroutine concurrently calls Send. streamCancel already handles proper stream closure. Also, streamExit signal should be called AFTER sending shutdownMsg Now the worker has no race condition if stopped during any moment (hopefully, tested with -race flag) * 🐛 fix(task_logger): deadlock in log closure * 🐛 fix(balance): fix balance task Removes the outdated "UnloadVolume" step as it is handled by "DeleteVolume". #7346
Diffstat (limited to 'weed/worker/client.go')
-rw-r--r--weed/worker/client.go21
1 files changed, 5 insertions, 16 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 0ec36e419..613d69987 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -26,7 +25,6 @@ type GrpcAdminClient struct {
dialOption grpc.DialOption
cmds chan grpcCommand
- closeOnce sync.Once
// Reconnection parameters
maxReconnectAttempts int
@@ -103,12 +101,14 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di
func (c *GrpcAdminClient) managerLoop() {
state := &grpcState{shouldReconnect: true}
+out:
for cmd := range c.cmds {
switch cmd.action {
case ActionConnect:
c.handleConnect(cmd, state)
case ActionDisconnect:
c.handleDisconnect(cmd, state)
+ break out
case ActionReconnect:
if state.connected || state.reconnecting || !state.shouldReconnect {
cmd.resp <- ErrAlreadyConnected
@@ -240,9 +240,6 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error {
if s.streamCancel != nil {
s.streamCancel()
}
- if s.stream != nil {
- s.stream.CloseSend()
- }
if s.conn != nil {
s.conn.Close()
}
@@ -412,9 +409,6 @@ func (c *GrpcAdminClient) Disconnect() error {
resp: resp,
}
err := <-resp
- c.closeOnce.Do(func() {
- close(c.cmds)
- })
return err
}
@@ -427,9 +421,6 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
// Send shutdown signal to stop reconnection loop
close(s.reconnectStop)
- // Send shutdown signal to stop handlers loop
- close(s.streamExit)
-
s.connected = false
s.shouldReconnect = false
@@ -452,16 +443,14 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
glog.Warningf("Failed to send shutdown message")
}
+ // Send shutdown signal to stop handlers loop
+ close(s.streamExit)
+
// Cancel stream context
if s.streamCancel != nil {
s.streamCancel()
}
- // Close stream
- if s.stream != nil {
- s.stream.CloseSend()
- }
-
// Close connection
if s.conn != nil {
s.conn.Close()