diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/client.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/client.go')
| -rw-r--r-- | weed/worker/client.go | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/weed/worker/client.go b/weed/worker/client.go index 53854c6e3..ef7e431c0 100644 --- a/weed/worker/client.go +++ b/weed/worker/client.go @@ -38,7 +38,7 @@ type GrpcAdminClient struct { reconnectMultiplier float64 // Worker registration info for re-registration after reconnection - lastWorkerInfo *types.Worker + lastWorkerInfo *types.WorkerData // Channels for communication outgoing chan *worker_pb.WorkerMessage @@ -404,7 +404,7 @@ func (c *GrpcAdminClient) handleIncomingWithReady(ready chan struct{}) { } // RegisterWorker registers the worker with the admin server -func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error { +func (c *GrpcAdminClient) RegisterWorker(worker *types.WorkerData) error { // Store worker info for re-registration after reconnection c.mutex.Lock() c.lastWorkerInfo = worker @@ -420,7 +420,7 @@ func (c *GrpcAdminClient) RegisterWorker(worker *types.Worker) error { } // sendRegistration sends the registration message and waits for response -func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error { +func (c *GrpcAdminClient) sendRegistration(worker *types.WorkerData) error { capabilities := make([]string, len(worker.Capabilities)) for i, cap := range worker.Capabilities { capabilities[i] = string(cap) @@ -467,7 +467,7 @@ func (c *GrpcAdminClient) sendRegistration(worker *types.Worker) error { } // sendRegistrationSync sends the registration message synchronously -func (c *GrpcAdminClient) sendRegistrationSync(worker *types.Worker) error { +func (c *GrpcAdminClient) sendRegistrationSync(worker *types.WorkerData) error { capabilities := make([]string, len(worker.Capabilities)) for i, cap := range worker.Capabilities { capabilities[i] = string(cap) @@ -585,7 +585,7 @@ func (c *GrpcAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta } // RequestTask requests a new task from admin server -func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) { +func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { if !c.connected { // If we're currently reconnecting, don't wait - just return no task c.mutex.RLock() @@ -646,7 +646,7 @@ func (c *GrpcAdminClient) RequestTask(workerID string, capabilities []types.Task workerID, taskAssign.TaskId, taskAssign.TaskType, taskAssign.Params.VolumeId) // Convert to our task type - task := &types.Task{ + task := &types.TaskInput{ ID: taskAssign.TaskId, Type: types.TaskType(taskAssign.TaskType), Status: types.TaskStatusAssigned, @@ -836,7 +836,7 @@ func (c *GrpcAdminClient) GetIncomingChannel() <-chan *worker_pb.AdminMessage { type MockAdminClient struct { workerID string connected bool - tasks []*types.Task + tasks []*types.TaskInput mutex sync.RWMutex } @@ -844,7 +844,7 @@ type MockAdminClient struct { func NewMockAdminClient() *MockAdminClient { return &MockAdminClient{ connected: true, - tasks: make([]*types.Task, 0), + tasks: make([]*types.TaskInput, 0), } } @@ -865,7 +865,7 @@ func (m *MockAdminClient) Disconnect() error { } // RegisterWorker mock implementation -func (m *MockAdminClient) RegisterWorker(worker *types.Worker) error { +func (m *MockAdminClient) RegisterWorker(worker *types.WorkerData) error { m.workerID = worker.ID glog.Infof("Mock: Worker %s registered with capabilities: %v", worker.ID, worker.Capabilities) return nil @@ -879,7 +879,7 @@ func (m *MockAdminClient) SendHeartbeat(workerID string, status *types.WorkerSta } // RequestTask mock implementation -func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.Task, error) { +func (m *MockAdminClient) RequestTask(workerID string, capabilities []types.TaskType) (*types.TaskInput, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -924,7 +924,7 @@ func (m *MockAdminClient) IsConnected() bool { } // AddMockTask adds a mock task for testing -func (m *MockAdminClient) AddMockTask(task *types.Task) { +func (m *MockAdminClient) AddMockTask(task *types.TaskInput) { m.mutex.Lock() defer m.mutex.Unlock() m.tasks = append(m.tasks, task) |
