aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/client.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-08-01 11:18:32 -0700
committerGitHub <noreply@github.com>2025-08-01 11:18:32 -0700
commit0975968e71b05368d5f28f788cf863c2042c2696 (patch)
tree5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/client.go
parent1cba609bfa2306cc2885df212febd5ff954aa693 (diff)
downloadseaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz
seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip
admin: Refactor task destination planning (#7063)
* refactor planning into task detection * refactoring worker tasks * refactor * compiles, but only balance task is registered * compiles, but has nil exception * avoid nil logger * add back ec task * setting ec log directory * implement balance and vacuum tasks * EC tasks will no longer fail with "file not found" errors * Use ReceiveFile API to send locally generated shards * distributing shard files and ecx,ecj,vif files * generate .ecx files correctly * do not mount all possible EC shards (0-13) on every destination * use constants * delete all replicas * rename files * pass in volume size to tasks
Diffstat (limited to 'weed/worker/client.go')
-rw-r--r--weed/worker/client.go22
1 files changed, 11 insertions, 11 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go
index 53854c6e3..ef7e431c0 100644
--- a/weed/worker/client.go
+++ b/weed/worker/client.go
@@ -38,7 +38,7 @@ type GrpcAdminClient struct {
reconnectMultiplier float64
// Worker registration info for re-registration after reconnection
- lastWorkerInfo *types.Worker
+ lastWorkerInfo *types.WorkerData
// Channels for communication
outgoing chan *worker_pb.WorkerMessage
@@ -404,7 +404,7 @@ func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) {
}
// RegisterWorker registers the worker with the admin server
-func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
+func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error {
// Store worker info for re-registration after reconnection
c.mutex.Lock()
c.lastWorkerInfo = worker
@@ -420,7 +420,7 @@ func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error {
}
// sendRegistration sends the registration message and waits for response
-func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
+func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
@@ -467,7 +467,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error {
}
// sendRegistrationSync sends the registration message synchronously
-func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error {
+func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error {
capabilities := make([]string, len(worker.Capabilities))
for i, cap := range worker.Capabilities {
capabilities[i] = string(cap)
@@ -585,7 +585,7 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta
}
// RequestTask requests a new task from admin server
-func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
+func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
if !c.connected {
// If we're currently reconnecting, don't wait - just return no task
c.mutex.RLock()
@@ -646,7 +646,7 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task
workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId)
// Convert to our task type
- task := &types.Task{
+ task := &types.TaskInput{
ID: taskAssign.TaskId,
Type: types.TaskType(taskAssign.TaskType),
Status: types.TaskStatusAssigned,
@@ -836,7 +836,7 @@ func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage {
type MockAdminClient struct {
workerID string
connected bool
- tasks []*types.Task
+ tasks []*types.TaskInput
mutex sync.RWMutex
}
@@ -844,7 +844,7 @@ type MockAdminClient struct {
func NewMockAdminClient() *MockAdminClient {
return &MockAdminClient{
connected: true,
- tasks: make([]*types.Task, 0),
+ tasks: make([]*types.TaskInput, 0),
}
}
@@ -865,7 +865,7 @@ func (m *MockAdminClient) Disconnect() error {
}
// RegisterWorker mock implementation
-func (m *MockAdminClient) RegisterWorker(worker *types.Worker) error {
+func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error {
m.workerID = worker.ID
glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities)
return nil
@@ -879,7 +879,7 @@ func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta
}
// RequestTask mock implementation
-func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) {
+func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
@@ -924,7 +924,7 @@ func (m *MockAdminClient) IsConnected() bool {
}
// AddMockTask adds a mock task for testing
-func (m *MockAdminClient) AddMockTask(task *types.Task) {
+func (m *MockAdminClient) AddMockTask(task *types.TaskInput) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.tasks = append(m.tasks, task)