aboutsummaryrefslogtreecommitdiff
path: root/weed/command/worker.go
blob: 6e592f73f3c5936002b6fc6f0724f224fd71505e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package command

import (
	"os"
	"os/signal"
	"path/filepath"
	"strings"
	"syscall"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/security"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"github.com/seaweedfs/seaweedfs/weed/worker"
	"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
	"github.com/seaweedfs/seaweedfs/weed/worker/types"

	// Import task packages to trigger their auto-registration
	_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
	_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
	_ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)

var cmdWorker = &Command{
	UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>] [-workingDir=<path>]",
	Short:     "start a maintenance worker to process cluster maintenance tasks",
	Long: `Start a maintenance worker that connects to an admin server to process
maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes.

The worker ID and address are automatically generated.
The worker connects to the admin server via gRPC (admin HTTP port + 10000).

Examples:
  weed worker -admin=localhost:23646
  weed worker -admin=admin.example.com:23646
  weed worker -admin=localhost:23646 -capabilities=vacuum,replication
  weed worker -admin=localhost:23646 -maxConcurrent=4
  weed worker -admin=localhost:23646 -workingDir=/tmp/worker
`,
}

var (
	workerAdminServer         = cmdWorker.Flag.String("admin", "localhost:23646", "admin server address")
	workerCapabilities        = cmdWorker.Flag.String("capabilities", "vacuum,ec,remote,replication,balance", "comma-separated list of task types this worker can handle")
	workerMaxConcurrent       = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks")
	workerHeartbeatInterval   = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval")
	workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval")
	workerWorkingDir          = cmdWorker.Flag.String("workingDir", "", "working directory for the worker")
)

func init() {
	cmdWorker.Run = runWorker

	// Set default capabilities from registered task types
	// This happens after package imports have triggered auto-registration
	tasks.SetDefaultCapabilitiesFromRegistry()
}

func runWorker(cmd *Command, args []string) bool {
	util.LoadConfiguration("security", false)

	glog.Infof("Starting maintenance worker")
	glog.Infof("Admin server: %s", *workerAdminServer)
	glog.Infof("Capabilities: %s", *workerCapabilities)

	// Parse capabilities
	capabilities := parseCapabilities(*workerCapabilities)
	if len(capabilities) == 0 {
		glog.Fatalf("No valid capabilities specified")
		return false
	}

	// Set working directory and create task-specific subdirectories
	var baseWorkingDir string
	if *workerWorkingDir != "" {
		glog.Infof("Setting working directory to: %s", *workerWorkingDir)
		if err := os.Chdir(*workerWorkingDir); err != nil {
			glog.Fatalf("Failed to change working directory: %v", err)
			return false
		}
		wd, err := os.Getwd()
		if err != nil {
			glog.Fatalf("Failed to get working directory: %v", err)
			return false
		}
		baseWorkingDir = wd
		glog.Infof("Current working directory: %s", baseWorkingDir)
	} else {
		// Use default working directory when not specified
		wd, err := os.Getwd()
		if err != nil {
			glog.Fatalf("Failed to get current working directory: %v", err)
			return false
		}
		baseWorkingDir = wd
		glog.Infof("Using current working directory: %s", baseWorkingDir)
	}

	// Create task-specific subdirectories
	for _, capability := range capabilities {
		taskDir := filepath.Join(baseWorkingDir, string(capability))
		if err := os.MkdirAll(taskDir, 0755); err != nil {
			glog.Fatalf("Failed to create task directory %s: %v", taskDir, err)
			return false
		}
		glog.Infof("Created task directory: %s", taskDir)
	}

	// Create gRPC dial option using TLS configuration
	grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")

	// Create worker configuration
	config := &types.WorkerConfig{
		AdminServer:         *workerAdminServer,
		Capabilities:        capabilities,
		MaxConcurrent:       *workerMaxConcurrent,
		HeartbeatInterval:   *workerHeartbeatInterval,
		TaskRequestInterval: *workerTaskRequestInterval,
		BaseWorkingDir:      baseWorkingDir,
		GrpcDialOption:      grpcDialOption,
	}

	// Create worker instance
	workerInstance, err := worker.NewWorker(config)
	if err != nil {
		glog.Fatalf("Failed to create worker: %v", err)
		return false
	}
	adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption)
	if err != nil {
		glog.Fatalf("Failed to create admin client: %v", err)
		return false
	}

	// Set admin client
	workerInstance.SetAdminClient(adminClient)

	// Set working directory
	if *workerWorkingDir != "" {
		glog.Infof("Setting working directory to: %s", *workerWorkingDir)
		if err := os.Chdir(*workerWorkingDir); err != nil {
			glog.Fatalf("Failed to change working directory: %v", err)
			return false
		}
		wd, err := os.Getwd()
		if err != nil {
			glog.Fatalf("Failed to get working directory: %v", err)
			return false
		}
		glog.Infof("Current working directory: %s", wd)
	}

	// Start the worker
	err = workerInstance.Start()
	if err != nil {
		glog.Errorf("Failed to start worker: %v", err)
		return false
	}

	// Set up signal handling
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	glog.Infof("Maintenance worker %s started successfully", workerInstance.ID())
	glog.Infof("Press Ctrl+C to stop the worker")

	// Wait for shutdown signal
	<-sigChan
	glog.Infof("Shutdown signal received, stopping worker...")

	// Gracefully stop the worker
	err = workerInstance.Stop()
	if err != nil {
		glog.Errorf("Error stopping worker: %v", err)
	}
	glog.Infof("Worker stopped")

	return true
}

// parseCapabilities converts comma-separated capability string to task types
func parseCapabilities(capabilityStr string) []types.TaskType {
	if capabilityStr == "" {
		return nil
	}

	capabilityMap := map[string]types.TaskType{}

	// Populate capabilityMap with registered task types
	typesRegistry := tasks.GetGlobalTypesRegistry()
	for taskType := range typesRegistry.GetAllDetectors() {
		// Use the task type string directly as the key
		capabilityMap[strings.ToLower(string(taskType))] = taskType
	}

	// Add common aliases for convenience
	if taskType, exists := capabilityMap["erasure_coding"]; exists {
		capabilityMap["ec"] = taskType
	}
	if taskType, exists := capabilityMap["remote_upload"]; exists {
		capabilityMap["remote"] = taskType
	}
	if taskType, exists := capabilityMap["fix_replication"]; exists {
		capabilityMap["replication"] = taskType
	}

	var capabilities []types.TaskType
	parts := strings.Split(capabilityStr, ",")

	for _, part := range parts {
		part = strings.TrimSpace(part)
		if taskType, exists := capabilityMap[part]; exists {
			capabilities = append(capabilities, taskType)
		} else {
			glog.Warningf("Unknown capability: %s", part)
		}
	}

	return capabilities
}

// Legacy compatibility types for backward compatibility
// These will be deprecated in future versions

// WorkerStatus represents the current status of a worker (deprecated)
type WorkerStatus struct {
	WorkerID       string           `json:"worker_id"`
	Address        string           `json:"address"`
	Status         string           `json:"status"`
	Capabilities   []types.TaskType `json:"capabilities"`
	MaxConcurrent  int              `json:"max_concurrent"`
	CurrentLoad    int              `json:"current_load"`
	LastHeartbeat  time.Time        `json:"last_heartbeat"`
	CurrentTasks   []types.Task     `json:"current_tasks"`
	Uptime         time.Duration    `json:"uptime"`
	TasksCompleted int              `json:"tasks_completed"`
	TasksFailed    int              `json:"tasks_failed"`
}