diff options
Diffstat (limited to 'weed/server/master_grpc_server.go')
| -rw-r--r-- | weed/server/master_grpc_server.go | 49 |
1 files changed, 31 insertions, 18 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 4b65979bd..7caaf01b2 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -70,8 +70,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } message := &master_pb.VolumeLocation{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, + DataCenter: dn.GetDataCenterId(), + Url: dn.Url(), + PublicUrl: dn.PublicUrl, } for _, v := range dn.GetVolumes() { message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) @@ -104,9 +105,32 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } - ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) + if !ms.Topo.IsLeader() { + // tell the volume servers about the leader + newLeader, err := ms.Topo.Leader() + if err != nil { + glog.Warningf("SendHeartbeat find leader: %v", err) + return err + } + if err := stream.Send(&master_pb.HeartbeatResponse{ + Leader: string(newLeader), + }); err != nil { + if dn != nil { + glog.Warningf("SendHeartbeat.Send response to %s:%d %v", dn.Ip, dn.Port, err) + } else { + glog.Warningf("SendHeartbeat.Send response %v", err) + } + return err + } + continue + } + ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { + // Skip delta heartbeat for volume server versions better than 3.28 https://github.com/seaweedfs/seaweedfs/pull/3630 + if heartbeat.Ip == "" { + continue + } // ToDo must be removed after update major version dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) @@ -162,8 +186,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { - dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - ms.Topo.DataNodeRegistration(dcName, rackName, dn) + if heartbeat.Ip != "" { + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + ms.Topo.DataNodeRegistration(dcName, rackName, dn) + } // process heartbeat.Volumes stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc() @@ -216,19 +242,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 { ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } - - // tell the volume servers about the leader - newLeader, err := ms.Topo.Leader() - if err != nil { - glog.Warningf("SendHeartbeat find leader: %v", err) - return err - } - if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: string(newLeader), - }); err != nil { - glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err) - return err - } } } |
