aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/worker/client.go21
-rw-r--r--weed/worker/tasks/balance/balance_task.go11
-rw-r--r--weed/worker/tasks/task_logger.go2
-rw-r--r--weed/worker/worker.go14
4 files changed, 12 insertions, 36 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 0ec36e419..613d69987 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -26,7 +25,6 @@ type GrpcAdminClient struct {
dialOption grpc.DialOption
cmds chan grpcCommand
- closeOnce sync.Once
// Reconnection parameters
maxReconnectAttempts int
@@ -103,12 +101,14 @@ func NewGrpcAdminClient(adminAddress string, workerID string, dialOption grpc.Di
func (c *GrpcAdminClient) managerLoop() {
state := &grpcState{shouldReconnect: true}
+out:
for cmd := range c.cmds {
switch cmd.action {
case ActionConnect:
c.handleConnect(cmd, state)
case ActionDisconnect:
c.handleDisconnect(cmd, state)
+ break out
case ActionReconnect:
if state.connected || state.reconnecting || !state.shouldReconnect {
cmd.resp <- ErrAlreadyConnected
@@ -240,9 +240,6 @@ func (c *GrpcAdminClient) reconnect(s *grpcState) error {
if s.streamCancel != nil {
s.streamCancel()
}
- if s.stream != nil {
- s.stream.CloseSend()
- }
if s.conn != nil {
s.conn.Close()
}
@@ -412,9 +409,6 @@ func (c *GrpcAdminClient) Disconnect() error {
resp: resp,
}
err := <-resp
- c.closeOnce.Do(func() {
- close(c.cmds)
- })
return err
}
@@ -427,9 +421,6 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
// Send shutdown signal to stop reconnection loop
close(s.reconnectStop)
- // Send shutdown signal to stop handlers loop
- close(s.streamExit)
-
s.connected = false
s.shouldReconnect = false
@@ -452,16 +443,14 @@ func (c *GrpcAdminClient) handleDisconnect(cmd grpcCommand, s *grpcState) {
glog.Warningf("Failed to send shutdown message")
}
+ // Send shutdown signal to stop handlers loop
+ close(s.streamExit)
+
// Cancel stream context
if s.streamCancel != nil {
s.streamCancel()
}
- // Close stream
- if s.stream != nil {
- s.stream.CloseSend()
- }
-
// Close connection
if s.conn != nil {
s.conn.Close()
diff --git a/weed/worker/tasks/balance/balance_task.go b/weed/worker/tasks/balance/balance_task.go
index 8daafde97..e36885add 100644
--- a/weed/worker/tasks/balance/balance_task.go
+++ b/weed/worker/tasks/balance/balance_task.go
@@ -106,15 +106,8 @@ func (t *BalanceTask) Execute(ctx context.Context, params *worker_pb.TaskParams)
glog.Warningf("Tail operation failed (may be normal): %v", err)
}
- // Step 5: Unmount from source
- t.ReportProgress(85.0)
- t.GetLogger().Info("Unmounting volume from source server")
- if err := t.unmountVolume(sourceServer, volumeId); err != nil {
- return fmt.Errorf("failed to unmount volume from source: %v", err)
- }
-
- // Step 6: Delete from source
- t.ReportProgress(95.0)
+ // Step 5: Delete from source
+ t.ReportProgress(90.0)
t.GetLogger().Info("Deleting volume from source server")
if err := t.deleteVolume(sourceServer, volumeId); err != nil {
return fmt.Errorf("failed to delete volume from source: %v", err)
diff --git a/weed/worker/tasks/task_logger.go b/weed/worker/tasks/task_logger.go
index 430513184..cc65c6d7b 100644
--- a/weed/worker/tasks/task_logger.go
+++ b/weed/worker/tasks/task_logger.go
@@ -232,6 +232,7 @@ func (l *FileTaskLogger) LogWithFields(level string, message string, fields map[
// Close closes the logger and finalizes metadata
func (l *FileTaskLogger) Close() error {
+ l.Info("Task logger closed for %s", l.taskID)
l.mutex.Lock()
defer l.mutex.Unlock()
@@ -260,7 +261,6 @@ func (l *FileTaskLogger) Close() error {
}
l.closed = true
- l.Info("Task logger closed for %s", l.taskID)
return nil
}
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index afc203318..bbd1f4662 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -7,7 +7,6 @@ import (
"os"
"path/filepath"
"strings"
- "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -29,7 +28,6 @@ type Worker struct {
cmds chan workerCommand
state *workerState
taskLogHandler *tasks.TaskLogHandler
- closeOnce sync.Once
}
type workerState struct {
running bool
@@ -201,13 +199,14 @@ func (w *Worker) managerLoop() {
stopChan: make(chan struct{}),
currentTasks: make(map[string]*types.TaskInput),
}
-
+out:
for cmd := range w.cmds {
switch cmd.action {
case ActionStart:
w.handleStart(cmd)
case ActionStop:
w.handleStop(cmd)
+ break out
case ActionGetStatus:
respCh := cmd.data.(statusResponse)
var currentTasks []types.TaskInput
@@ -495,15 +494,15 @@ func (w *Worker) Stop() error {
// Wait for tasks to finish
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
+out:
for w.getTaskLoad() > 0 {
select {
case <-timeout.C:
glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad())
- goto end_wait
+ break out
case <-time.After(100 * time.Millisecond):
}
}
-end_wait:
// Disconnect from admin server
if adminClient := w.getAdmin(); adminClient != nil {
@@ -511,10 +510,6 @@ end_wait:
glog.Errorf("Error disconnecting from admin server: %v", err)
}
}
-
- w.closeOnce.Do(func() {
- close(w.cmds)
- })
glog.Infof("Worker %s stopped", w.id)
return nil
}
@@ -731,7 +726,6 @@ func (w *Worker) executeTask(task *types.TaskInput) {
fileLogger.Info("Task %s completed successfully", task.ID)
}
}
- return
}
// completeTask reports task completion to admin server