aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-09-08 11:23:33 +0500
committerGitHub <noreply@github.com>2022-09-07 23:23:33 -0700
commit31d2f77ceb8aa0acd4ebf7a85ca24c9375de0a17 (patch)
tree4eed0f9c75577b5c1e1764a21cc1556a81c3f9de
parent7de112943b2548183c0dfe594600479489c0bced (diff)
downloadseaweedfs-31d2f77ceb8aa0acd4ebf7a85ca24c9375de0a17.tar.xz
seaweedfs-31d2f77ceb8aa0acd4ebf7a85ca24c9375de0a17.zip
refactor https://github.com/seaweedfs/seaweedfs/pull/3616 (#3625)
-rw-r--r--weed/server/master_grpc_server.go212
1 files changed, 106 insertions, 106 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index b6b80ce47..74d347142 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -104,136 +104,136 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
- if ms.Topo.IsLeader() {
- ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
-
- if dn == nil {
- dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- dc := ms.Topo.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
- glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
- uuidlist, err := ms.RegisterUuids(heartbeat)
- if err != nil {
- if stream_err := stream.Send(&master_pb.HeartbeatResponse{
- DuplicatedUuids: uuidlist,
- }); stream_err != nil {
- glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
- return stream_err
- }
- return err
+ if !ms.Topo.IsLeader() {
+ // tell the volume servers about the leader
+ newLeader, err := ms.Topo.Leader()
+ if err != nil {
+ glog.Warningf("SendHeartbeat find leader: %v", err)
+ return err
+ }
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ Leader: string(newLeader),
+ }); err != nil {
+ if dn != nil {
+ glog.Warningf("SendHeartbeat.Send response to %s:%d %v", dn.Ip, dn.Port, err)
+ } else {
+ glog.Warningf("SendHeartbeat.Send response %v", err)
}
+ return err
+ }
+ continue
+ }
- if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- }); err != nil {
- glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
- return err
+ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
+ if dn == nil {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := ms.Topo.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
+ glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
+ uuidlist, err := ms.RegisterUuids(heartbeat)
+ if err != nil {
+ if stream_err := stream.Send(&master_pb.HeartbeatResponse{
+ DuplicatedUuids: uuidlist,
+ }); stream_err != nil {
+ glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
+ return stream_err
}
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc()
- dn.Counter++
+ return err
}
- dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ }); err != nil {
+ glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
+ return err
+ }
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc()
+ dn.Counter++
+ }
- glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
+ dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
- message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
- DataCenter: dn.GetDataCenterId(),
- }
- if len(heartbeat.NewVolumes) > 0 {
- stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
- }
- if len(heartbeat.DeletedVolumes) > 0 {
- stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc()
+ glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
+
+ message := &master_pb.VolumeLocation{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ DataCenter: dn.GetDataCenterId(),
+ }
+ if len(heartbeat.NewVolumes) > 0 {
+ stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
+ }
+ if len(heartbeat.DeletedVolumes) > 0 {
+ stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc()
+ }
+ if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
+ // process delta volume ids if exists for fast volume id updates
+ for _, volInfo := range heartbeat.NewVolumes {
+ message.NewVids = append(message.NewVids, volInfo.Id)
}
- if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
- // process delta volume ids if exists for fast volume id updates
- for _, volInfo := range heartbeat.NewVolumes {
- message.NewVids = append(message.NewVids, volInfo.Id)
- }
- for _, volInfo := range heartbeat.DeletedVolumes {
- message.DeletedVids = append(message.DeletedVids, volInfo.Id)
- }
- // update master internal volume layouts
- ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ for _, volInfo := range heartbeat.DeletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, volInfo.Id)
}
+ // update master internal volume layouts
+ ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ }
- if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
- dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+ if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ ms.Topo.DataNodeRegistration(dcName, rackName, dn)
- // process heartbeat.Volumes
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
- newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ // process heartbeat.Volumes
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
+ newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
- for _, v := range newVolumes {
- glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
- message.NewVids = append(message.NewVids, uint32(v.Id))
- }
- for _, v := range deletedVolumes {
- glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
- message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
- }
+ for _, v := range newVolumes {
+ glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
+ message.NewVids = append(message.NewVids, uint32(v.Id))
}
+ for _, v := range deletedVolumes {
+ glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
+ message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ }
+ }
- if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("newEcShards").Inc()
- // update master internal volume layouts
- ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("newEcShards").Inc()
+ // update master internal volume layouts
+ ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
- for _, s := range heartbeat.NewEcShards {
- message.NewEcVids = append(message.NewEcVids, s.Id)
- }
- for _, s := range heartbeat.DeletedEcShards {
- if dn.HasEcShards(needle.VolumeId(s.Id)) {
- continue
- }
- message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
+ for _, s := range heartbeat.NewEcShards {
+ message.NewEcVids = append(message.NewEcVids, s.Id)
+ }
+ for _, s := range heartbeat.DeletedEcShards {
+ if dn.HasEcShards(needle.VolumeId(s.Id)) {
+ continue
}
-
+ message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
}
- if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc()
- glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
- newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+ }
- // broadcast the ec vid changes to master clients
- for _, s := range newShards {
- message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
- }
- for _, s := range deletedShards {
- if dn.HasVolumesById(s.VolumeId) {
- continue
- }
- message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
- }
+ if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc()
+ glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+ // broadcast the ec vid changes to master clients
+ for _, s := range newShards {
+ message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
}
- if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
- ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
+ for _, s := range deletedShards {
+ if dn.HasVolumesById(s.VolumeId) {
+ continue
+ }
+ message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
}
- }
- // tell the volume servers about the leader
- newLeader, err := ms.Topo.Leader()
- if err != nil {
- glog.Warningf("SendHeartbeat find leader: %v", err)
- return err
}
- if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: string(newLeader),
- }); err != nil {
- if dn != nil {
- glog.Warningf("SendHeartbeat.Send response to %s:%d %v", dn.Ip, dn.Port, err)
- } else {
- glog.Warningf("SendHeartbeat.Send response %v", err)
- }
- return err
+ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
}
}