diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_server.go | 62 | ||||
| -rw-r--r-- | weed/server/volume_grpc_vacuum.go | 11 |
2 files changed, 22 insertions, 51 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 45981bbb4..57103f166 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/stats" "net/http" @@ -32,9 +31,8 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" ) type MasterOption struct { @@ -65,9 +63,6 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCn chan string - onPeerUpdateDoneCnExist bool - // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.KeepConnectedResponse @@ -118,7 +113,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) - ms.onPeerUpdateDoneCn = make(chan string) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate @@ -351,50 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd { - if isLeader { - raftServerFound := false - for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerName { - raftServerFound = true - } - } - if !raftServerFound { - glog.V(0).Infof("adding new raft server: %s", peerName) - ms.Topo.HashicorpRaft.AddVoter( - hashicorpRaft.ServerID(peerName), - hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + if update.IsAdd && isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true } } - if ms.onPeerUpdateDoneCnExist { - ms.onPeerUpdateDoneCn <- peerName + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } - } else if isLeader { - go func(peerName string) { - raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) - for { - select { - case <-raftServerRemovalTimeAfter: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ - Id: peerName, - Force: false, - }) - return err - }) - if err != nil { - glog.Warningf("failed to removing old raft server %s: %v", peerName, err) - } - glog.V(0).Infof("old raft server %s removed", peerName) - return - case peerDone := <-ms.onPeerUpdateDoneCn: - if peerName == peerDone { - glog.V(0).Infof("raft server %s remove canceled", peerName) - return - } - } - } - }(peerName) - ms.onPeerUpdateDoneCnExist = true } } diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index f11764f3a..5252584e1 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -3,11 +3,15 @@ package weed_server import ( "context" + "github.com/prometheus/procfs" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "runtime" ) +var numCPU = runtime.NumCPU() + func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) { resp := &volume_server_pb.VacuumVolumeCheckResponse{} @@ -29,11 +33,16 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo resp := &volume_server_pb.VacuumVolumeCompactResponse{} reportInterval := int64(1024 * 1024 * 128) nextReportTarget := reportInterval - + fs, fsErr := procfs.NewDefaultFS() var sendErr error err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool { if processed > nextReportTarget { resp.ProcessedBytes = processed + if fsErr == nil && numCPU > 0 { + if fsLa, err := fs.LoadAvg(); err == nil { + resp.LoadAvg_1M = float32(fsLa.Load1 / float64(numCPU)) + } + } if sendErr = stream.Send(resp); sendErr != nil { return false } |
