aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/masterclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient/masterclient.go')
-rw-r--r--weed/wdclient/masterclient.go56
1 files changed, 52 insertions, 4 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 244a3921a..35f1c4cf8 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -38,6 +38,39 @@ func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientTy
}
}
+func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return mc.LookupFileIdWithFallback
+}
+
+func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
+ fullUrls, err = mc.vidMap.LookupFileId(fileId)
+ if err == nil {
+ return
+ }
+ err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: []string{fileId},
+ })
+ if err != nil {
+ return err
+ }
+ for vid, vidLocation := range resp.VolumeIdLocations {
+ for _, vidLoc := range vidLocation.Locations {
+ loc := Location{
+ Url: vidLoc.Url,
+ PublicUrl: vidLoc.PublicUrl,
+ GrpcPort: int(vidLoc.GrpcPort),
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ fullUrls = append(fullUrls, "http://"+loc.Url+"/"+fileId)
+ }
+ }
+
+ return nil
+ })
+ return
+}
+
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
return mc.currentMaster
@@ -98,7 +131,6 @@ func (mc *MasterClient) tryAllMasters() {
}
mc.currentMaster = ""
- mc.vidMap = newVidMap("")
}
}
@@ -126,9 +158,25 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
return err
}
-
glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
+
+ resp, err := stream.Recv()
+ if err != nil {
+ glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
+ return err
+ }
+
+ // check if it is the leader to determine whether to reset the vidMap
+ if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" && string(master) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("master %v redirected to leader %v", master, resp.VolumeLocation.Leader)
+ nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
+ return nil
+ }
+
mc.currentMaster = master
+ mc.vidMap = newVidMap("")
for {
resp, err := stream.Recv()
@@ -140,8 +188,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" {
- glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
return nil