aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_grpc_server.go')
-rw-r--r--weed/server/master_grpc_server.go49
1 files changed, 31 insertions, 18 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 4b65979bd..7caaf01b2 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -70,8 +70,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
+ DataCenter: dn.GetDataCenterId(),
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
}
for _, v := range dn.GetVolumes() {
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
@@ -104,9 +105,32 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
- ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
+ if !ms.Topo.IsLeader() {
+ // tell the volume servers about the leader
+ newLeader, err := ms.Topo.Leader()
+ if err != nil {
+ glog.Warningf("SendHeartbeat find leader: %v", err)
+ return err
+ }
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ Leader: string(newLeader),
+ }); err != nil {
+ if dn != nil {
+ glog.Warningf("SendHeartbeat.Send response to %s:%d %v", dn.Ip, dn.Port, err)
+ } else {
+ glog.Warningf("SendHeartbeat.Send response %v", err)
+ }
+ return err
+ }
+ continue
+ }
+ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
if dn == nil {
+ // Skip delta heartbeat for volume server versions better than 3.28 https://github.com/seaweedfs/seaweedfs/pull/3630
+ if heartbeat.Ip == "" {
+ continue
+ } // ToDo must be removed after update major version
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
@@ -162,8 +186,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
- dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+ if heartbeat.Ip != "" {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+ }
// process heartbeat.Volumes
stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
@@ -216,19 +242,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
-
- // tell the volume servers about the leader
- newLeader, err := ms.Topo.Leader()
- if err != nil {
- glog.Warningf("SendHeartbeat find leader: %v", err)
- return err
- }
- if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: string(newLeader),
- }); err != nil {
- glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
- return err
- }
}
}