aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/pb/master_pb/master_helper.go5
-rw-r--r--weed/server/master_grpc_server.go15
-rw-r--r--weed/server/volume_grpc_client_to_master.go29
-rw-r--r--weed/topology/configuration.go11
-rw-r--r--weed/wdclient/masterclient.go4
5 files changed, 46 insertions, 18 deletions
diff --git a/weed/pb/master_pb/master_helper.go b/weed/pb/master_pb/master_helper.go
new file mode 100644
index 000000000..52006fdee
--- /dev/null
+++ b/weed/pb/master_pb/master_helper.go
@@ -0,0 +1,5 @@
+package master_pb
+
+func (v *VolumeLocation) IsEmptyUrl() bool {
+ return v.Url == "" || v.Url == ":0"
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 74d347142..7caaf01b2 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -70,8 +70,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
+ DataCenter: dn.GetDataCenterId(),
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
}
for _, v := range dn.GetVolumes() {
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
@@ -126,6 +127,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
if dn == nil {
+ // Skip delta heartbeat for volume server versions better than 3.28 https://github.com/seaweedfs/seaweedfs/pull/3630
+ if heartbeat.Ip == "" {
+ continue
+ } // ToDo must be removed after update major version
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
@@ -181,8 +186,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
- dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+ if heartbeat.Ip != "" {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+ }
// process heartbeat.Volumes
stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index e55d821a8..3849eac19 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -160,11 +160,18 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
volumeTickChan := time.Tick(sleepInterval)
ecShardTickChan := time.Tick(17 * sleepInterval)
-
+ dataCenter := vs.store.GetDataCenter()
+ rack := vs.store.GetRack()
+ ip := vs.store.Ip
+ port := uint32(vs.store.Port)
for {
select {
case volumeMessage := <-vs.store.NewVolumesChan:
deltaBeat := &master_pb.Heartbeat{
+ Ip: ip,
+ Port: port,
+ DataCenter: dataCenter,
+ Rack: rack,
NewVolumes: []*master_pb.VolumeShortInformationMessage{
&volumeMessage,
},
@@ -176,6 +183,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
}
case ecShardMessage := <-vs.store.NewEcShardsChan:
deltaBeat := &master_pb.Heartbeat{
+ Ip: ip,
+ Port: port,
+ DataCenter: dataCenter,
+ Rack: rack,
NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
&ecShardMessage,
},
@@ -188,6 +199,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
}
case volumeMessage := <-vs.store.DeletedVolumesChan:
deltaBeat := &master_pb.Heartbeat{
+ Ip: ip,
+ Port: port,
+ DataCenter: dataCenter,
+ Rack: rack,
DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
&volumeMessage,
},
@@ -199,6 +214,10 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
}
case ecShardMessage := <-vs.store.DeletedEcShardsChan:
deltaBeat := &master_pb.Heartbeat{
+ Ip: ip,
+ Port: port,
+ DataCenter: dataCenter,
+ Rack: rack,
DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
&ecShardMessage,
},
@@ -227,12 +246,12 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti
case <-vs.stopChan:
var volumeMessages []*master_pb.VolumeInformationMessage
emptyBeat := &master_pb.Heartbeat{
- Ip: vs.store.Ip,
- Port: uint32(vs.store.Port),
+ Ip: ip,
+ Port: port,
PublicUrl: vs.store.PublicUrl,
MaxFileKey: uint64(0),
- DataCenter: vs.store.GetDataCenter(),
- Rack: vs.store.GetRack(),
+ DataCenter: dataCenter,
+ Rack: rack,
Volumes: volumeMessages,
HasNoVolumes: len(volumeMessages) == 0,
}
diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go
index cb635658f..d9fde4a04 100644
--- a/weed/topology/configuration.go
+++ b/weed/topology/configuration.go
@@ -20,9 +20,8 @@ type topology struct {
DataCenters []dataCenter `xml:"DataCenter"`
}
type Configuration struct {
- XMLName xml.Name `xml:"Configuration"`
- Topo topology `xml:"Topology"`
- ip2location map[string]loc // this is not used any more. leave it here for later.
+ XMLName xml.Name `xml:"Configuration"`
+ Topo topology `xml:"Topology"`
}
func (c *Configuration) String() string {
@@ -33,12 +32,6 @@ func (c *Configuration) String() string {
}
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
-
if dcName == "" {
dcName = "DefaultDataCenter"
}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 41c5a8bc4..64ae876b0 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -262,6 +262,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
+ if resp.VolumeLocation.IsEmptyUrl() {
+ glog.V(0).Infof("updateVidMap ignore short heartbeat: %+v", resp)
+ return
+ }
// process new volume location
loc := Location{
Url: resp.VolumeLocation.Url,