aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorfamosss <zzq09494@ly.com>2022-09-08 00:48:51 +0800
committerGitHub <noreply@github.com>2022-09-07 09:48:51 -0700
commit9678fc2106d58a80fe130fc1f59eb2d6dbf6e8dc (patch)
tree6bc1b08279242b2c798ef6afed9dfa0b1a5f55a0 /weed/server
parent5ff33eb558fda658175c86f48cf7a203b8a78fde (diff)
downloadseaweedfs-9678fc2106d58a80fe130fc1f59eb2d6dbf6e8dc.tar.xz
seaweedfs-9678fc2106d58a80fe130fc1f59eb2d6dbf6e8dc.zip
fix: volume heartbeat processing error (#3616)
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server.go182
1 files changed, 92 insertions, 90 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 4b65979bd..d618f28af 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -104,117 +104,119 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
- ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
-
- if dn == nil {
- dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- dc := ms.Topo.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
- glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
- uuidlist, err := ms.RegisterUuids(heartbeat)
- if err != nil {
- if stream_err := stream.Send(&master_pb.HeartbeatResponse{
- DuplicatedUuids: uuidlist,
- }); stream_err != nil {
- glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
- return stream_err
+ if ms.Topo.IsLeader() {
+ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
+
+ if dn == nil {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := ms.Topo.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
+ glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
+ uuidlist, err := ms.RegisterUuids(heartbeat)
+ if err != nil {
+ if stream_err := stream.Send(&master_pb.HeartbeatResponse{
+ DuplicatedUuids: uuidlist,
+ }); stream_err != nil {
+ glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
+ return stream_err
+ }
+ return err
}
- return err
- }
- if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- }); err != nil {
- glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
- return err
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ }); err != nil {
+ glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
+ return err
+ }
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc()
+ dn.Counter++
}
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc()
- dn.Counter++
- }
- dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
+ dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
- glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
+ glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
- message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
- DataCenter: dn.GetDataCenterId(),
- }
- if len(heartbeat.NewVolumes) > 0 {
- stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
- }
- if len(heartbeat.DeletedVolumes) > 0 {
- stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc()
- }
- if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
- // process delta volume ids if exists for fast volume id updates
- for _, volInfo := range heartbeat.NewVolumes {
- message.NewVids = append(message.NewVids, volInfo.Id)
+ message := &master_pb.VolumeLocation{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ DataCenter: dn.GetDataCenterId(),
}
- for _, volInfo := range heartbeat.DeletedVolumes {
- message.DeletedVids = append(message.DeletedVids, volInfo.Id)
+ if len(heartbeat.NewVolumes) > 0 {
+ stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
+ }
+ if len(heartbeat.DeletedVolumes) > 0 {
+ stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc()
+ }
+ if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
+ // process delta volume ids if exists for fast volume id updates
+ for _, volInfo := range heartbeat.NewVolumes {
+ message.NewVids = append(message.NewVids, volInfo.Id)
+ }
+ for _, volInfo := range heartbeat.DeletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, volInfo.Id)
+ }
+ // update master internal volume layouts
+ ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
}
- // update master internal volume layouts
- ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
- }
- if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
- dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+ if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ ms.Topo.DataNodeRegistration(dcName, rackName, dn)
- // process heartbeat.Volumes
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
- newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ // process heartbeat.Volumes
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
+ newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
- for _, v := range newVolumes {
- glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
- message.NewVids = append(message.NewVids, uint32(v.Id))
- }
- for _, v := range deletedVolumes {
- glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
- message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ for _, v := range newVolumes {
+ glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
+ message.NewVids = append(message.NewVids, uint32(v.Id))
+ }
+ for _, v := range deletedVolumes {
+ glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
+ message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ }
}
- }
- if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("newEcShards").Inc()
- // update master internal volume layouts
- ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("newEcShards").Inc()
+ // update master internal volume layouts
+ ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
- for _, s := range heartbeat.NewEcShards {
- message.NewEcVids = append(message.NewEcVids, s.Id)
- }
- for _, s := range heartbeat.DeletedEcShards {
- if dn.HasEcShards(needle.VolumeId(s.Id)) {
- continue
+ for _, s := range heartbeat.NewEcShards {
+ message.NewEcVids = append(message.NewEcVids, s.Id)
}
- message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
+ for _, s := range heartbeat.DeletedEcShards {
+ if dn.HasEcShards(needle.VolumeId(s.Id)) {
+ continue
+ }
+ message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
+ }
+
}
- }
+ if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc()
+ glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
- if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
- stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc()
- glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
- newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+ // broadcast the ec vid changes to master clients
+ for _, s := range newShards {
+ message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
+ }
+ for _, s := range deletedShards {
+ if dn.HasVolumesById(s.VolumeId) {
+ continue
+ }
+ message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
+ }
- // broadcast the ec vid changes to master clients
- for _, s := range newShards {
- message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
}
- for _, s := range deletedShards {
- if dn.HasVolumesById(s.VolumeId) {
- continue
- }
- message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
+ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
-
- }
- if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
- ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
// tell the volume servers about the leader