diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 10 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 28 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 6 |
3 files changed, 33 insertions, 11 deletions
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index f81ec649d..9296c63e9 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -196,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv } +func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) { + + resp := &volume_server_pb.VolumeServerLeaveResponse{} + + vs.StopHeartbeat() + + return resp, nil + +} + func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) { resp := &volume_server_pb.VolumeNeedleStatusResponse{} diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index ac94ff9d4..7f3a1635c 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -31,7 +31,7 @@ func (vs *VolumeServer) heartbeat() { var err error var newLeader string - for { + for vs.isHeartbeating { for _, master := range vs.SeedMasterNodes { if newLeader != "" { // the new leader may actually is the same master @@ -52,10 +52,22 @@ func (vs *VolumeServer) heartbeat() { newLeader = "" vs.store.MasterAddress = "" } + if !vs.isHeartbeating { + break + } } } } +func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { + if !vs.isHeartbeating { + return true + } + vs.isHeartbeating = false + vs.stopChan <- true + return false +} + func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { ctx, cancel := context.WithCancel(context.Background()) @@ -171,14 +183,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - if vs.SendHeartbeat { - glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) - if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return "", err - } - } else { - glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port) + glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err } case <-ecShardTickChan: glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) @@ -188,6 +196,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } case err = <-doneChan: return + case <-vs.stopChan: + return } } } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 6612e9045..e9d815579 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -31,7 +31,8 @@ type VolumeServer struct { MetricsAddress string MetricsIntervalSec int fileSizeLimitBytes int64 - SendHeartbeat bool + isHeartbeating bool + stopChan chan bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -67,7 +68,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, - SendHeartbeat: true, + isHeartbeating: true, + stopChan: make(chan bool), } vs.SeedMasterNodes = masterNodes vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) |
