diff options
Diffstat (limited to 'weed/wdclient/masterclient.go')
| -rw-r--r-- | weed/wdclient/masterclient.go | 56 |
1 files changed, 52 insertions, 4 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 244a3921a..35f1c4cf8 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -38,6 +38,39 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy } } +func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { + return mc.LookupFileIdWithFallback +} + +func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) { + fullUrls, err = mc.vidMap.LookupFileId(fileId) + if err == nil { + return + } + err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: []string{fileId}, + }) + if err != nil { + return err + } + for vid, vidLocation := range resp.VolumeIdLocations { + for _, vidLoc := range vidLocation.Locations { + loc := Location{ + Url: vidLoc.Url, + PublicUrl: vidLoc.PublicUrl, + GrpcPort: int(vidLoc.GrpcPort), + } + mc.vidMap.addLocation(uint32(vid), loc) + fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId) + } + } + + return nil + }) + return +} + func (mc *MasterClient) GetMaster() pb.ServerAddress { mc.WaitUntilConnected() return mc.currentMaster @@ -98,7 +131,6 @@ func (mc *MasterClient) tryAllMasters() { } mc.currentMaster = "" - mc.vidMap = newVidMap("") } } @@ -126,9 +158,25 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc() return err } - glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master) + + resp, err := stream.Recv() + if err != nil { + glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err) + stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc() + return err + } + + // check if it is the leader to determine whether to reset the vidMap + if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader { + glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader) + nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) + stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() + return nil + } + mc.currentMaster = master + mc.vidMap = newVidMap("") for { resp, err := stream.Recv() @@ -140,8 +188,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.VolumeLocation != nil { // maybe the leader is changed - if resp.VolumeLocation.Leader != "" { - glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader) + if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader { + glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc() return nil |
