aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-08-24 00:30:03 -0700
committerChris Lu <chris.lu@gmail.com>2018-08-24 00:30:03 -0700
commit5ccf8e8078eee570f7ff444ea86a07589394a4ed (patch)
tree95b3b8c26f7227e468585304934e08a8ca166892
parentac793a3c5ac782cf2ea0db44f0f4c73fff15ee90 (diff)
downloadseaweedfs-5ccf8e8078eee570f7ff444ea86a07589394a4ed.tar.xz
seaweedfs-5ccf8e8078eee570f7ff444ea86a07589394a4ed.zip
reformat
-rw-r--r--weed/server/master_grpc_server.go83
1 files changed, 41 insertions, 42 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 2952a8071..e12449819 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -43,55 +43,54 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
for {
heartbeat, err := stream.Recv()
- if err == nil {
- if dn == nil {
- t.Sequence.SetMax(heartbeat.MaxFileKey)
- if heartbeat.Ip == "" {
- if pr, ok := peer.FromContext(stream.Context()); ok {
- if pr.Addr != net.Addr(nil) {
- heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
- glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
- }
+ if err != nil {
+ return err
+ }
+
+ if dn == nil {
+ t.Sequence.SetMax(heartbeat.MaxFileKey)
+ if heartbeat.Ip == "" {
+ if pr, ok := peer.FromContext(stream.Context()); ok {
+ if pr.Addr != net.Addr(nil) {
+ heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
+ glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
}
}
- dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip,
- int(heartbeat.Port), heartbeat.PublicUrl,
- int(heartbeat.MaxVolumeCount))
- glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
- if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
- SecretKey: string(ms.guard.SecretKey),
- }); err != nil {
- return err
- }
}
+ dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip,
+ int(heartbeat.Port), heartbeat.PublicUrl,
+ int(heartbeat.MaxVolumeCount))
+ glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
+ SecretKey: string(ms.guard.SecretKey),
+ }); err != nil {
+ return err
+ }
+ }
- newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
- message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
- }
- for _, v := range newVolumes {
- message.NewVids = append(message.NewVids, uint32(v.Id))
- }
- for _, v := range deletedVolumes {
- message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
- }
+ message := &master_pb.VolumeLocation{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ }
+ for _, v := range newVolumes {
+ message.NewVids = append(message.NewVids, uint32(v.Id))
+ }
+ for _, v := range deletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
+ }
- if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
- ms.clientChansLock.RLock()
- for _, ch := range ms.clientChans {
- ch <- message
- }
- ms.clientChansLock.RUnlock()
+ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
+ ms.clientChansLock.RLock()
+ for _, ch := range ms.clientChans {
+ ch <- message
}
-
- } else {
- return err
+ ms.clientChansLock.RUnlock()
}
// tell the volume servers about the leader