diff options
Diffstat (limited to 'weed/wdclient/masterclient.go')
| -rw-r--r-- | weed/wdclient/masterclient.go | 176 |
1 files changed, 133 insertions, 43 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index d76ae31e2..def4a9a71 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -2,6 +2,7 @@ package wdclient import ( "context" + "github.com/chrislusf/seaweedfs/weed/stats" "math/rand" "time" @@ -14,32 +15,69 @@ import ( ) type MasterClient struct { + FilerGroup string clientType string - clientHost string - grpcPort uint32 - currentMaster string - masters []string + clientHost pb.ServerAddress + currentMaster pb.ServerAddress + masters map[string]pb.ServerAddress grpcDialOption grpc.DialOption vidMap + + OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) } -func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, clientDataCenter string, masters []string) *MasterClient { +func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient { return &MasterClient{ + FilerGroup: filerGroup, clientType: clientType, clientHost: clientHost, - grpcPort: clientGrpcPort, masters: masters, grpcDialOption: grpcDialOption, vidMap: newVidMap(clientDataCenter), } } -func (mc *MasterClient) GetMaster() string { +func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { + return mc.LookupFileIdWithFallback +} + +func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) { + fullUrls, err = mc.vidMap.LookupFileId(fileId) + err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: []string{fileId}, + }) + if err != nil { + return err + } + for vid, vidLocation := range resp.VolumeIdLocations { + for _, vidLoc := range vidLocation.Locations { + loc := Location{ + Url: vidLoc.Url, + PublicUrl: vidLoc.PublicUrl, + GrpcPort: int(vidLoc.GrpcPort), + } + mc.vidMap.addLocation(uint32(vid), loc) + fullUrls = append(fullUrls, loc.Url) + } + } + + return nil + }) + return +} + +func (mc *MasterClient) GetMaster() pb.ServerAddress { mc.WaitUntilConnected() return mc.currentMaster } +func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress { + mc.WaitUntilConnected() + return mc.masters +} + func (mc *MasterClient) WaitUntilConnected() { for mc.currentMaster == "" { time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) @@ -47,19 +85,19 @@ func (mc *MasterClient) WaitUntilConnected() { } func (mc *MasterClient) KeepConnectedToMaster() { - glog.V(1).Infof("%s masterClient bootstraps with masters %v", mc.clientType, mc.masters) + glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) for { mc.tryAllMasters() time.Sleep(time.Second) } } -func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress string) (leader string) { +func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) { for _, master := range mc.masters { if master == myMasterAddress { continue } - if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) defer cancel() resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) @@ -81,7 +119,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress string) (leader } func (mc *MasterClient) tryAllMasters() { - nextHintedLeader := "" + var nextHintedLeader pb.ServerAddress for _, master := range mc.masters { nextHintedLeader = mc.tryConnectToMaster(master) @@ -90,74 +128,126 @@ func (mc *MasterClient) tryAllMasters() { } mc.currentMaster = "" - mc.vidMap = newVidMap("") } } -func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) { - glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master) - gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { - +func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { + glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) + stats.MasterClientConnectCounter.WithLabelValues("total").Inc() + gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.KeepConnected(ctx) if err != nil { - glog.V(1).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err) + glog.V(1).Infof("%s.%s masterClient failed to keep connected to %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc() return err } - if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, GrpcPort: mc.grpcPort}); err != nil { - glog.V(0).Infof("%s masterClient failed to send to %s: %v", mc.clientType, master, err) + if err = stream.Send(&master_pb.KeepConnectedRequest{ + FilerGroup: mc.FilerGroup, + ClientType: mc.clientType, + ClientAddress: string(mc.clientHost), + Version: util.Version(), + }); err != nil { + glog.V(0).Infof("%s.%s masterClient failed to send to %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc() + return err + } + glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master) + + resp, err := stream.Recv() + if err != nil { + glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() return err } - glog.V(1).Infof("%s masterClient Connected to %v", mc.clientType, master) + // check if it is the leader to determine whether to reset the vidMap + if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" { + glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader) + nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) + stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() + return nil + } + mc.currentMaster = master + mc.vidMap = newVidMap("") for { - volumeLocation, err := stream.Recv() + resp, err := stream.Recv() if err != nil { - glog.V(0).Infof("%s masterClient failed to receive from %s: %v", mc.clientType, master, err) + glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() return err } - // maybe the leader is changed - if volumeLocation.Leader != "" { - glog.V(0).Infof("redirected to leader %v", volumeLocation.Leader) - nextHintedLeader = volumeLocation.Leader - return nil + if resp.VolumeLocation != nil { + // maybe the leader is changed + if resp.VolumeLocation.Leader != "" { + glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader) + nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) + stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() + return nil + } + + // process new volume location + loc := Location{ + Url: resp.VolumeLocation.Url, + PublicUrl: resp.VolumeLocation.PublicUrl, + DataCenter: resp.VolumeLocation.DataCenter, + GrpcPort: int(resp.VolumeLocation.GrpcPort), + } + for _, newVid := range resp.VolumeLocation.NewVids { + glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid) + mc.addLocation(newVid, loc) + } + for _, deletedVid := range resp.VolumeLocation.DeletedVids { + glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid) + mc.deleteLocation(deletedVid, loc) + } + for _, newEcVid := range resp.VolumeLocation.NewEcVids { + glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid) + mc.addEcLocation(newEcVid, loc) + } + for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids { + glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid) + mc.deleteEcLocation(deletedEcVid, loc) + } } - // process new volume location - loc := Location{ - Url: volumeLocation.Url, - PublicUrl: volumeLocation.PublicUrl, - DataCenter: volumeLocation.DataCenter, - } - for _, newVid := range volumeLocation.NewVids { - glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid) - mc.addLocation(newVid, loc) - } - for _, deletedVid := range volumeLocation.DeletedVids { - glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid) - mc.deleteLocation(deletedVid, loc) + if resp.ClusterNodeUpdate != nil { + update := resp.ClusterNodeUpdate + if mc.OnPeerUpdate != nil { + if update.FilerGroup == mc.FilerGroup { + if update.IsAdd { + glog.V(0).Infof("+ %s.%s %s leader:%v\n", update.FilerGroup, update.NodeType, update.Address, update.IsLeader) + } else { + glog.V(0).Infof("- %s.%s %s leader:%v\n", update.FilerGroup, update.NodeType, update.Address, update.IsLeader) + } + stats.MasterClientConnectCounter.WithLabelValues(stats.OnPeerUpdate).Inc() + mc.OnPeerUpdate(update, time.Now()) + } + } } + } }) if gprcErr != nil { - glog.V(1).Infof("%s masterClient failed to connect with master %v: %v", mc.clientType, master, gprcErr) + stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc() + glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr) } return } -func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error { +func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { for mc.currentMaster == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) }) |
