aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/worker.go')
-rw-r--r--weed/worker/worker.go14
1 files changed, 4 insertions, 10 deletions
diff --git a/weed/worker/worker.go b/weed/worker/worker.go
index afc203318..bbd1f4662 100644
--- a/weed/worker/worker.go
+++ b/weed/worker/worker.go
@@ -7,7 +7,6 @@ import (
"os"
"path/filepath"
"strings"
- "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -29,7 +28,6 @@ type Worker struct {
cmds chan workerCommand
state *workerState
taskLogHandler *tasks.TaskLogHandler
- closeOnce sync.Once
}
type workerState struct {
running bool
@@ -201,13 +199,14 @@ func (w *Worker) managerLoop() {
stopChan: make(chan struct{}),
currentTasks: make(map[string]*types.TaskInput),
}
-
+out:
for cmd := range w.cmds {
switch cmd.action {
case ActionStart:
w.handleStart(cmd)
case ActionStop:
w.handleStop(cmd)
+ break out
case ActionGetStatus:
respCh := cmd.data.(statusResponse)
var currentTasks []types.TaskInput
@@ -495,15 +494,15 @@ func (w *Worker) Stop() error {
// Wait for tasks to finish
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()
+out:
for w.getTaskLoad() > 0 {
select {
case <-timeout.C:
glog.Warningf("Worker %s stopping with %d tasks still running", w.id, w.getTaskLoad())
- goto end_wait
+ break out
case <-time.After(100 * time.Millisecond):
}
}
-end_wait:
// Disconnect from admin server
if adminClient := w.getAdmin(); adminClient != nil {
@@ -511,10 +510,6 @@ end_wait:
glog.Errorf("Error disconnecting from admin server: %v", err)
}
}
-
- w.closeOnce.Do(func() {
- close(w.cmds)
- })
glog.Infof("Worker %s stopped", w.id)
return nil
}
@@ -731,7 +726,6 @@ func (w *Worker) executeTask(task *types.TaskInput) {
fileLogger.Info("Task %s completed successfully", task.ID)
}
}
- return
}
// completeTask reports task completion to admin server