aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/admin.go190
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/scaffold/security.toml20
-rw-r--r--weed/command/worker.go182
4 files changed, 353 insertions, 40 deletions
diff --git a/weed/command/admin.go b/weed/command/admin.go
index ef1d54bb3..027fbec68 100644
--- a/weed/command/admin.go
+++ b/weed/command/admin.go
@@ -3,12 +3,12 @@ package command
import (
"context"
"crypto/rand"
- "crypto/tls"
"fmt"
"log"
"net/http"
"os"
"os/signal"
+ "os/user"
"path/filepath"
"strings"
"syscall"
@@ -17,9 +17,12 @@ import (
"github.com/gin-contrib/sessions"
"github.com/gin-contrib/sessions/cookie"
"github.com/gin-gonic/gin"
+ "github.com/spf13/viper"
"github.com/seaweedfs/seaweedfs/weed/admin/dash"
"github.com/seaweedfs/seaweedfs/weed/admin/handlers"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
var (
@@ -29,25 +32,23 @@ var (
type AdminOptions struct {
port *int
masters *string
- tlsCertPath *string
- tlsKeyPath *string
adminUser *string
adminPassword *string
+ dataDir *string
}
func init() {
cmdAdmin.Run = runAdmin // break init cycle
a.port = cmdAdmin.Flag.Int("port", 23646, "admin server port")
a.masters = cmdAdmin.Flag.String("masters", "localhost:9333", "comma-separated master servers")
- a.tlsCertPath = cmdAdmin.Flag.String("tlsCert", "", "path to TLS certificate file")
- a.tlsKeyPath = cmdAdmin.Flag.String("tlsKey", "", "path to TLS private key file")
+ a.dataDir = cmdAdmin.Flag.String("dataDir", "", "directory to store admin configuration and data files")
a.adminUser = cmdAdmin.Flag.String("adminUser", "admin", "admin interface username")
a.adminPassword = cmdAdmin.Flag.String("adminPassword", "", "admin interface password (if empty, auth is disabled)")
}
var cmdAdmin = &Command{
- UsageLine: "admin -port=23646 -masters=localhost:9333",
+ UsageLine: "admin -port=23646 -masters=localhost:9333 [-dataDir=/path/to/data]",
Short: "start SeaweedFS web admin interface",
Long: `Start a web admin interface for SeaweedFS cluster management.
@@ -60,25 +61,56 @@ var cmdAdmin = &Command{
- Maintenance operations
The admin interface automatically discovers filers from the master servers.
+ A gRPC server for worker connections runs on HTTP port + 10000.
Example Usage:
weed admin -port=23646 -masters="master1:9333,master2:9333"
- weed admin -port=443 -tlsCert=/etc/ssl/admin.crt -tlsKey=/etc/ssl/admin.key
+ weed admin -port=23646 -masters="localhost:9333" -dataDir="/var/lib/seaweedfs-admin"
+ weed admin -port=23646 -masters="localhost:9333" -dataDir="~/seaweedfs-admin"
+
+ Data Directory:
+ - If dataDir is specified, admin configuration and maintenance data is persisted
+ - The directory will be created if it doesn't exist
+ - Configuration files are stored in JSON format for easy editing
+ - Without dataDir, all configuration is kept in memory only
Authentication:
- If adminPassword is not set, the admin interface runs without authentication
- If adminPassword is set, users must login with adminUser/adminPassword
- Sessions are secured with auto-generated session keys
- Security:
- - Use HTTPS in production by providing TLS certificates
+ Security Configuration:
+ - The admin server reads TLS configuration from security.toml
+ - Configure [https.admin] section in security.toml for HTTPS support
+ - If https.admin.key is set, the server will start in TLS mode
+ - If https.admin.ca is set, mutual TLS authentication is enabled
- Set strong adminPassword for production deployments
- Configure firewall rules to restrict admin interface access
+ security.toml Example:
+ [https.admin]
+ cert = "/etc/ssl/admin.crt"
+ key = "/etc/ssl/admin.key"
+ ca = "/etc/ssl/ca.crt" # optional, for mutual TLS
+
+ Worker Communication:
+ - Workers connect via gRPC on HTTP port + 10000
+ - Workers use [grpc.admin] configuration from security.toml
+ - TLS is automatically used if certificates are configured
+ - Workers fall back to insecure connections if TLS is unavailable
+
+ Configuration File:
+ - The security.toml file is read from ".", "$HOME/.seaweedfs/",
+ "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order
+ - Generate example security.toml: weed scaffold -config=security
+
`,
}
func runAdmin(cmd *Command, args []string) bool {
+ // Load security configuration
+ util.LoadSecurityConfiguration()
+
// Validate required parameters
if *a.masters == "" {
fmt.Println("Error: masters parameter is required")
@@ -86,37 +118,25 @@ func runAdmin(cmd *Command, args []string) bool {
return false
}
- // Validate TLS configuration
- if (*a.tlsCertPath != "" && *a.tlsKeyPath == "") ||
- (*a.tlsCertPath == "" && *a.tlsKeyPath != "") {
- fmt.Println("Error: Both tlsCert and tlsKey must be provided for TLS")
- return false
- }
-
// Security warnings
if *a.adminPassword == "" {
fmt.Println("WARNING: Admin interface is running without authentication!")
fmt.Println(" Set -adminPassword for production use")
}
- if *a.tlsCertPath == "" {
- fmt.Println("WARNING: Admin interface is running without TLS encryption!")
- fmt.Println(" Use -tlsCert and -tlsKey for production use")
- }
-
fmt.Printf("Starting SeaweedFS Admin Interface on port %d\n", *a.port)
fmt.Printf("Masters: %s\n", *a.masters)
fmt.Printf("Filers will be discovered automatically from masters\n")
+ if *a.dataDir != "" {
+ fmt.Printf("Data Directory: %s\n", *a.dataDir)
+ } else {
+ fmt.Printf("Data Directory: Not specified (configuration will be in-memory only)\n")
+ }
if *a.adminPassword != "" {
fmt.Printf("Authentication: Enabled (user: %s)\n", *a.adminUser)
} else {
fmt.Printf("Authentication: Disabled\n")
}
- if *a.tlsCertPath != "" {
- fmt.Printf("TLS: Enabled\n")
- } else {
- fmt.Printf("TLS: Disabled\n")
- }
// Set up graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
@@ -169,8 +189,29 @@ func startAdminServer(ctx context.Context, options AdminOptions) error {
log.Printf("Warning: Static files not found at %s", staticPath)
}
+ // Create data directory if specified
+ var dataDir string
+ if *options.dataDir != "" {
+ // Expand tilde (~) to home directory
+ expandedDir, err := expandHomeDir(*options.dataDir)
+ if err != nil {
+ return fmt.Errorf("failed to expand dataDir path %s: %v", *options.dataDir, err)
+ }
+ dataDir = expandedDir
+
+ // Show path expansion if it occurred
+ if dataDir != *options.dataDir {
+ fmt.Printf("Expanded dataDir: %s -> %s\n", *options.dataDir, dataDir)
+ }
+
+ if err := os.MkdirAll(dataDir, 0755); err != nil {
+ return fmt.Errorf("failed to create data directory %s: %v", dataDir, err)
+ }
+ fmt.Printf("Data directory created/verified: %s\n", dataDir)
+ }
+
// Create admin server
- adminServer := dash.NewAdminServer(*options.masters, nil)
+ adminServer := dash.NewAdminServer(*options.masters, nil, dataDir)
// Show discovered filers
filers := adminServer.GetAllFilers()
@@ -180,6 +221,19 @@ func startAdminServer(ctx context.Context, options AdminOptions) error {
fmt.Printf("No filers discovered from masters\n")
}
+ // Start worker gRPC server for worker connections
+ err = adminServer.StartWorkerGrpcServer(*options.port)
+ if err != nil {
+ return fmt.Errorf("failed to start worker gRPC server: %v", err)
+ }
+
+ // Set up cleanup for gRPC server
+ defer func() {
+ if stopErr := adminServer.StopWorkerGrpcServer(); stopErr != nil {
+ log.Printf("Error stopping worker gRPC server: %v", stopErr)
+ }
+ }()
+
// Create handlers and setup routes
adminHandlers := handlers.NewAdminHandlers(adminServer)
adminHandlers.SetupRoutes(r, *options.adminPassword != "", *options.adminUser, *options.adminPassword)
@@ -191,21 +245,37 @@ func startAdminServer(ctx context.Context, options AdminOptions) error {
Handler: r,
}
- // TLS configuration
- if *options.tlsCertPath != "" && *options.tlsKeyPath != "" {
- server.TLSConfig = &tls.Config{
- MinVersion: tls.VersionTLS12,
- }
- }
-
// Start server
go func() {
log.Printf("Starting SeaweedFS Admin Server on port %d", *options.port)
- var err error
- if *options.tlsCertPath != "" && *options.tlsKeyPath != "" {
- log.Printf("Using TLS with cert: %s, key: %s", *options.tlsCertPath, *options.tlsKeyPath)
- err = server.ListenAndServeTLS(*options.tlsCertPath, *options.tlsKeyPath)
+ // start http or https server with security.toml
+ var (
+ clientCertFile,
+ certFile,
+ keyFile string
+ )
+ useTLS := false
+ useMTLS := false
+
+ if viper.GetString("https.admin.key") != "" {
+ useTLS = true
+ certFile = viper.GetString("https.admin.cert")
+ keyFile = viper.GetString("https.admin.key")
+ }
+
+ if viper.GetString("https.admin.ca") != "" {
+ useMTLS = true
+ clientCertFile = viper.GetString("https.admin.ca")
+ }
+
+ if useMTLS {
+ server.TLSConfig = security.LoadClientTLSHTTP(clientCertFile)
+ }
+
+ if useTLS {
+ log.Printf("Starting SeaweedFS Admin Server with TLS on port %d", *options.port)
+ err = server.ListenAndServeTLS(certFile, keyFile)
} else {
err = server.ListenAndServe()
}
@@ -234,3 +304,47 @@ func startAdminServer(ctx context.Context, options AdminOptions) error {
func GetAdminOptions() *AdminOptions {
return &AdminOptions{}
}
+
+// expandHomeDir expands the tilde (~) in a path to the user's home directory
+func expandHomeDir(path string) (string, error) {
+ if path == "" {
+ return path, nil
+ }
+
+ if !strings.HasPrefix(path, "~") {
+ return path, nil
+ }
+
+ // Get current user
+ currentUser, err := user.Current()
+ if err != nil {
+ return "", fmt.Errorf("failed to get current user: %v", err)
+ }
+
+ // Handle different tilde patterns
+ if path == "~" {
+ return currentUser.HomeDir, nil
+ }
+
+ if strings.HasPrefix(path, "~/") {
+ return filepath.Join(currentUser.HomeDir, path[2:]), nil
+ }
+
+ // Handle ~username/ patterns
+ if strings.HasPrefix(path, "~") {
+ parts := strings.SplitN(path[1:], "/", 2)
+ username := parts[0]
+
+ targetUser, err := user.Lookup(username)
+ if err != nil {
+ return "", fmt.Errorf("user %s not found: %v", username, err)
+ }
+
+ if len(parts) == 1 {
+ return targetUser.HomeDir, nil
+ }
+ return filepath.Join(targetUser.HomeDir, parts[1]), nil
+ }
+
+ return path, nil
+}
diff --git a/weed/command/command.go b/weed/command/command.go
index 65ddce717..06474fbb9 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -45,6 +45,7 @@ var Commands = []*Command{
cmdVolume,
cmdWebDav,
cmdSftp,
+ cmdWorker,
}
type Command struct {
diff --git a/weed/command/scaffold/security.toml b/weed/command/scaffold/security.toml
index 2efcac354..bc95ecf2e 100644
--- a/weed/command/scaffold/security.toml
+++ b/weed/command/scaffold/security.toml
@@ -2,7 +2,7 @@
# ./security.toml
# $HOME/.seaweedfs/security.toml
# /etc/seaweedfs/security.toml
-# this file is read by master, volume server, and filer
+# this file is read by master, volume server, filer, and worker
# comma separated origins allowed to make requests to the filer and s3 gateway.
# enter in this format: https://domain.com, or http://localhost:port
@@ -94,6 +94,16 @@ cert = ""
key = ""
allowed_commonNames = "" # comma-separated SSL certificate common names
+[grpc.admin]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
+[grpc.worker]
+cert = ""
+key = ""
+allowed_commonNames = "" # comma-separated SSL certificate common names
+
# use this for any place needs a grpc client
# i.e., "weed backup|benchmark|filer.copy|filer.replicate|mount|s3|upload"
[grpc.client]
@@ -101,7 +111,7 @@ cert = ""
key = ""
# https client for master|volume|filer|etc connection
-# It is necessary that the parameters [https.volume]|[https.master]|[https.filer] are set
+# It is necessary that the parameters [https.volume]|[https.master]|[https.filer]|[https.admin] are set
[https.client]
enabled = false
cert = ""
@@ -127,6 +137,12 @@ key = ""
ca = ""
# disable_tls_verify_client_cert = true|false (default: false)
+# admin server https options
+[https.admin]
+cert = ""
+key = ""
+ca = ""
+
# white list. It's checking request ip address.
[guard]
white_list = ""
diff --git a/weed/command/worker.go b/weed/command/worker.go
new file mode 100644
index 000000000..f217e57f7
--- /dev/null
+++ b/weed/command/worker.go
@@ -0,0 +1,182 @@
+package command
+
+import (
+ "os"
+ "os/signal"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/worker"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+
+ // Import task packages to trigger their auto-registration
+ _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
+ _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
+ _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
+)
+
+var cmdWorker = &Command{
+ UsageLine: "worker -admin=<admin_server> [-capabilities=<task_types>] [-maxConcurrent=<num>]",
+ Short: "start a maintenance worker to process cluster maintenance tasks",
+ Long: `Start a maintenance worker that connects to an admin server to process
+maintenance tasks like vacuum, erasure coding, remote upload, and replication fixes.
+
+The worker ID and address are automatically generated.
+The worker connects to the admin server via gRPC (admin HTTP port + 10000).
+
+Examples:
+ weed worker -admin=localhost:23646
+ weed worker -admin=admin.example.com:23646
+ weed worker -admin=localhost:23646 -capabilities=vacuum,replication
+ weed worker -admin=localhost:23646 -maxConcurrent=4
+`,
+}
+
+var (
+ workerAdminServer = cmdWorker.Flag.String("admin", "localhost:23646", "admin server address")
+ workerCapabilities = cmdWorker.Flag.String("capabilities", "vacuum,ec,remote,replication,balance", "comma-separated list of task types this worker can handle")
+ workerMaxConcurrent = cmdWorker.Flag.Int("maxConcurrent", 2, "maximum number of concurrent tasks")
+ workerHeartbeatInterval = cmdWorker.Flag.Duration("heartbeat", 30*time.Second, "heartbeat interval")
+ workerTaskRequestInterval = cmdWorker.Flag.Duration("taskInterval", 5*time.Second, "task request interval")
+)
+
+func init() {
+ cmdWorker.Run = runWorker
+
+ // Set default capabilities from registered task types
+ // This happens after package imports have triggered auto-registration
+ tasks.SetDefaultCapabilitiesFromRegistry()
+}
+
+func runWorker(cmd *Command, args []string) bool {
+ util.LoadConfiguration("security", false)
+
+ glog.Infof("Starting maintenance worker")
+ glog.Infof("Admin server: %s", *workerAdminServer)
+ glog.Infof("Capabilities: %s", *workerCapabilities)
+
+ // Parse capabilities
+ capabilities := parseCapabilities(*workerCapabilities)
+ if len(capabilities) == 0 {
+ glog.Fatalf("No valid capabilities specified")
+ return false
+ }
+
+ // Create worker configuration
+ config := &types.WorkerConfig{
+ AdminServer: *workerAdminServer,
+ Capabilities: capabilities,
+ MaxConcurrent: *workerMaxConcurrent,
+ HeartbeatInterval: *workerHeartbeatInterval,
+ TaskRequestInterval: *workerTaskRequestInterval,
+ }
+
+ // Create worker instance
+ workerInstance, err := worker.NewWorker(config)
+ if err != nil {
+ glog.Fatalf("Failed to create worker: %v", err)
+ return false
+ }
+
+ // Create admin client with LoadClientTLS
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.worker")
+ adminClient, err := worker.CreateAdminClient(*workerAdminServer, workerInstance.ID(), grpcDialOption)
+ if err != nil {
+ glog.Fatalf("Failed to create admin client: %v", err)
+ return false
+ }
+
+ // Set admin client
+ workerInstance.SetAdminClient(adminClient)
+
+ // Start the worker
+ err = workerInstance.Start()
+ if err != nil {
+ glog.Fatalf("Failed to start worker: %v", err)
+ return false
+ }
+
+ // Set up signal handling
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
+
+ glog.Infof("Maintenance worker %s started successfully", workerInstance.ID())
+ glog.Infof("Press Ctrl+C to stop the worker")
+
+ // Wait for shutdown signal
+ <-sigChan
+ glog.Infof("Shutdown signal received, stopping worker...")
+
+ // Gracefully stop the worker
+ err = workerInstance.Stop()
+ if err != nil {
+ glog.Errorf("Error stopping worker: %v", err)
+ }
+ glog.Infof("Worker stopped")
+
+ return true
+}
+
+// parseCapabilities converts comma-separated capability string to task types
+func parseCapabilities(capabilityStr string) []types.TaskType {
+ if capabilityStr == "" {
+ return nil
+ }
+
+ capabilityMap := map[string]types.TaskType{}
+
+ // Populate capabilityMap with registered task types
+ typesRegistry := tasks.GetGlobalTypesRegistry()
+ for taskType := range typesRegistry.GetAllDetectors() {
+ // Use the task type string directly as the key
+ capabilityMap[strings.ToLower(string(taskType))] = taskType
+ }
+
+ // Add common aliases for convenience
+ if taskType, exists := capabilityMap["erasure_coding"]; exists {
+ capabilityMap["ec"] = taskType
+ }
+ if taskType, exists := capabilityMap["remote_upload"]; exists {
+ capabilityMap["remote"] = taskType
+ }
+ if taskType, exists := capabilityMap["fix_replication"]; exists {
+ capabilityMap["replication"] = taskType
+ }
+
+ var capabilities []types.TaskType
+ parts := strings.Split(capabilityStr, ",")
+
+ for _, part := range parts {
+ part = strings.TrimSpace(part)
+ if taskType, exists := capabilityMap[part]; exists {
+ capabilities = append(capabilities, taskType)
+ } else {
+ glog.Warningf("Unknown capability: %s", part)
+ }
+ }
+
+ return capabilities
+}
+
+// Legacy compatibility types for backward compatibility
+// These will be deprecated in future versions
+
+// WorkerStatus represents the current status of a worker (deprecated)
+type WorkerStatus struct {
+ WorkerID string `json:"worker_id"`
+ Address string `json:"address"`
+ Status string `json:"status"`
+ Capabilities []types.TaskType `json:"capabilities"`
+ MaxConcurrent int `json:"max_concurrent"`
+ CurrentLoad int `json:"current_load"`
+ LastHeartbeat time.Time `json:"last_heartbeat"`
+ CurrentTasks []types.Task `json:"current_tasks"`
+ Uptime time.Duration `json:"uptime"`
+ TasksCompleted int `json:"tasks_completed"`
+ TasksFailed int `json:"tasks_failed"`
+}