diff options
| author | Mariano Ntrougkas <44480600+marios1861@users.noreply.github.com> | 2025-10-24 03:09:46 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-23 17:09:46 -0700 |
| commit | f06ddd05cc148e2fc9ea60566ca4f6346e1f2762 (patch) | |
| tree | 22c3b1ced2b78778e1a2c1eca8e1d1553e7cf957 /weed/worker/client.go | |
| parent | 557aa4ec097267ff31e34a5c2fdd7b256e42d1be (diff) | |
| download | seaweedfs-f06ddd05cc148e2fc9ea60566ca4f6346e1f2762.tar.xz seaweedfs-f06ddd05cc148e2fc9ea60566ca4f6346e1f2762.zip | |
Improve-worker (#7367)
* ♻️ refactor(worker): remove goto
* ♻️ refactor(worker): let manager loop exit by itself
* ♻️ refactor(worker): fix race condition when closing worker
CloseSend is not safe to call when another
goroutine concurrently calls Send. streamCancel
already handles proper stream closure. Also,
streamExit signal should be called AFTER
sending shutdownMsg
Now the worker has no race condition if stopped
during any moment (hopefully, tested with -race
flag)
* 🐛 fix(task_logger): deadlock in log closure
* 🐛 fix(balance): fix balance task
Removes the outdated "UnloadVolume" step as it is handled by "DeleteVolume".
#7346
Diffstat (limited to 'weed/worker/client.go')
| -rw-r--r-- | weed/worker/client.go | 21 |
1 files changed, 5 insertions, 16 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go index 0ec36e419..613d69987 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "sync" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -26,7 +25,6 @@ type GrpcAdminClient struct { dialOption grpc.DialOption cmds chan grpcCommand - closeOnce sync.Once // Reconnection parameters maxReconnectAttempts int @@ -103,12 +101,14 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di func (c *GrpcAdminClient) managerLoop() { state := &grpcState{shouldReconnect: true} +out: for cmd := range c.cmds { switch cmd.action { case ActionConnect: c.handleConnect(cmd, state) case ActionDisconnect: c.handleDisconnect(cmd, state) + break out case ActionReconnect: if state.connected || state.reconnecting || !state.shouldReconnect { cmd.resp <- ErrAlreadyConnected @@ -240,9 +240,6 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error { if s.streamCancel != nil { s.streamCancel() } - if s.stream != nil { - s.stream.CloseSend() - } if s.conn != nil { s.conn.Close() } @@ -412,9 +409,6 @@ func (c *GrpcAdminClient) Disconnect() error { resp: resp, } err := <-resp - c.closeOnce.Do(func() { - close(c.cmds) - }) return err } @@ -427,9 +421,6 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { // Send shutdown signal to stop reconnection loop close(s.reconnectStop) - // Send shutdown signal to stop handlers loop - close(s.streamExit) - s.connected = false s.shouldReconnect = false @@ -452,16 +443,14 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) { glog.Warningf("Failed to send shutdown message") } + // Send shutdown signal to stop handlers loop + close(s.streamExit) + // Cancel stream context if s.streamCancel != nil { s.streamCancel() } - // Close stream - if s.stream != nil { - s.stream.CloseSend() - } - // Close connection if s.conn != nil { s.conn.Close() |
