diff options
Diffstat (limited to 'weed/server/volume_grpc_client.go')
| -rw-r--r-- | weed/server/volume_grpc_client.go | 52 |
1 files changed, 27 insertions, 25 deletions
diff --git a/weed/server/volume_grpc_client.go b/weed/server/volume_grpc_client.go index 2f3f36924..7688745e2 100644 --- a/weed/server/volume_grpc_client.go +++ b/weed/server/volume_grpc_client.go @@ -7,49 +7,51 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage" "golang.org/x/net/context" "google.golang.org/grpc" ) +func (vs *VolumeServer) GetMaster() string { + return vs.currentMaster +} func (vs *VolumeServer) heartbeat() { - glog.V(0).Infof("Volume server bootstraps with master %s", vs.GetMasterNode()) - vs.masterNodes = storage.NewMasterNodes(vs.masterNode) + glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes) vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) + var err error + var newLeader string for { - err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second) - if err != nil { - glog.V(0).Infof("heartbeat error: %v", err) - time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) + for _, master := range vs.MasterNodes { + if newLeader != "" { + master = newLeader + } + newLeader, err = vs.doHeartbeat(master, time.Duration(vs.pulseSeconds)*time.Second) + if err != nil { + glog.V(0).Infof("heartbeat error: %v", err) + time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) + } } } } -func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { - - vs.masterNodes.Reset() - masterNode, err := vs.masterNodes.FindMaster() - if err != nil { - return fmt.Errorf("No master found: %v", err) - } +func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) { grpcConection, err := grpc.Dial(masterNode, grpc.WithInsecure()) if err != nil { - return fmt.Errorf("fail to dial: %v", err) + return "", fmt.Errorf("fail to dial: %v", err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) stream, err := client.SendHeartbeat(context.Background()) if err != nil { - glog.V(0).Infof("%v.SendHeartbeat(_) = _, %v", client, err) - return err + glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) + return "", err } - vs.SetMasterNode(masterNode) - glog.V(0).Infof("Heartbeat to %s", masterNode) + glog.V(0).Infof("Heartbeat to: %v", masterNode) + vs.currentMaster = masterNode vs.store.Client = stream defer func() { vs.store.Client = nil }() @@ -70,7 +72,8 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { vs.guard.SecretKey = security.Secret(in.GetSecretKey()) } if in.GetLeader() != "" && masterNode != in.GetLeader() { - vs.masterNodes.SetPossibleLeader(in.GetLeader()) + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) + newLeader = in.GetLeader() doneChan <- nil return } @@ -79,7 +82,7 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return err + return "", err } tickChan := time.Tick(sleepInterval) @@ -89,11 +92,10 @@ func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error { case <-tickChan: if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return err + return "", err } - case err := <-doneChan: - glog.V(0).Infof("Volume Server heart beat stops with %v", err) - return err + case <-doneChan: + return } } } |
