aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
committershibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
commit7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch)
tree5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/wdclient
parent29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff)
parent53c3aad87528d57343afc5fdb3fb5107544af0fc (diff)
downloadseaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz
seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/wdclient')
-rw-r--r--weed/wdclient/exclusive_locks/exclusive_locker.go16
-rw-r--r--weed/wdclient/masterclient.go49
-rw-r--r--weed/wdclient/vid_map.go51
3 files changed, 68 insertions, 48 deletions
diff --git a/weed/wdclient/exclusive_locks/exclusive_locker.go b/weed/wdclient/exclusive_locks/exclusive_locker.go
index 1ecfe6ce2..d477a6b2d 100644
--- a/weed/wdclient/exclusive_locks/exclusive_locker.go
+++ b/weed/wdclient/exclusive_locks/exclusive_locker.go
@@ -46,10 +46,13 @@ func (l *ExclusiveLocker) RequestLock() {
return
}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
// retry to get the lease
for {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
@@ -71,9 +74,12 @@ func (l *ExclusiveLocker) RequestLock() {
// start a goroutine to renew the lease
go func() {
+ ctx2, cancel2 := context.WithCancel(context.Background())
+ defer cancel2()
+
for l.isLocking {
if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+ resp, err := client.LeaseAdminToken(ctx2, &master_pb.LeaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
@@ -98,8 +104,12 @@ func (l *ExclusiveLocker) RequestLock() {
func (l *ExclusiveLocker) ReleaseLock() {
l.isLocking = false
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
- client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+ client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
PreviousToken: atomic.LoadInt64(&l.token),
PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
LockName: AdminLockName,
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 4f8e0d5ef..60753e582 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -45,13 +45,39 @@ func (mc *MasterClient) WaitUntilConnected() {
}
func (mc *MasterClient) KeepConnectedToMaster() {
- glog.V(1).Infof("%s bootstraps with masters %v", mc.clientType, mc.masters)
+ glog.V(1).Infof("%s masterClient bootstraps with masters %v", mc.clientType, mc.masters)
for {
mc.tryAllMasters()
time.Sleep(time.Second)
}
}
+func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress string) (leader string) {
+ for _, master := range mc.masters {
+ if master == myMasterAddress {
+ continue
+ }
+ if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
+ defer cancel()
+ resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ leader = resp.Leader
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", master, grpcErr)
+ }
+ if leader != "" {
+ glog.V(0).Infof("existing leader is %s", leader)
+ return
+ }
+ }
+ glog.V(0).Infof("No existing leader found!")
+ return
+}
+
func (mc *MasterClient) tryAllMasters() {
nextHintedLeader := ""
for _, master := range mc.masters {
@@ -67,27 +93,30 @@ func (mc *MasterClient) tryAllMasters() {
}
func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
- glog.V(1).Infof("%s Connecting to master %v", mc.clientType, master)
+ glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
- stream, err := client.KeepConnected(context.Background())
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.KeepConnected(ctx)
if err != nil {
- glog.V(0).Infof("%s failed to keep connected to %s: %v", mc.clientType, master, err)
+ glog.V(1).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
return err
}
if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, GrpcPort: mc.grpcPort}); err != nil {
- glog.V(0).Infof("%s failed to send to %s: %v", mc.clientType, master, err)
+ glog.V(0).Infof("%s masterClient failed to send to %s: %v", mc.clientType, master, err)
return err
}
- glog.V(1).Infof("%s Connected to %v", mc.clientType, master)
+ glog.V(1).Infof("%s masterClient Connected to %v", mc.clientType, master)
mc.currentMaster = master
for {
volumeLocation, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("%s failed to receive from %s: %v", mc.clientType, master, err)
+ glog.V(0).Infof("%s masterClient failed to receive from %s: %v", mc.clientType, master, err)
return err
}
@@ -104,18 +133,18 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
PublicUrl: volumeLocation.PublicUrl,
}
for _, newVid := range volumeLocation.NewVids {
- glog.V(1).Infof("%s: %s adds volume %d", mc.clientType, loc.Url, newVid)
+ glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid)
mc.addLocation(newVid, loc)
}
for _, deletedVid := range volumeLocation.DeletedVids {
- glog.V(1).Infof("%s: %s removes volume %d", mc.clientType, loc.Url, deletedVid)
+ glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
mc.deleteLocation(deletedVid, loc)
}
}
})
if gprcErr != nil {
- glog.V(0).Infof("%s failed to connect with master %v: %v", mc.clientType, master, gprcErr)
+ glog.V(1).Infof("%s masterClient failed to connect with master %v: %v", mc.clientType, master, gprcErr)
}
return
}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 97df49cb6..cee2da6e1 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -44,38 +44,36 @@ func (vc *vidMap) getLocationIndex(length int) (int, error) {
return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil
}
-func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
+func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
- return "", err
+ return nil, err
}
- return vc.GetRandomLocation(uint32(id))
-}
-
-func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) {
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
+ locations, found := vc.GetLocations(uint32(id))
+ if !found {
+ return nil, fmt.Errorf("volume %d not found", id)
}
- serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
- if lookupError != nil {
- return "", lookupError
+ for _, loc := range locations {
+ serverUrls = append(serverUrls, loc.Url)
}
- return "http://" + serverUrl + "/" + fileId, nil
+ return
}
-func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err error) {
+func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
+ return nil, errors.New("Invalid fileId " + fileId)
}
- serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
+ serverUrls, lookupError := vc.LookupVolumeServerUrl(parts[0])
if lookupError != nil {
- return "", lookupError
+ return nil, lookupError
}
- return serverUrl, nil
+ for _, serverUrl := range serverUrls {
+ fullUrls = append(fullUrls, "http://"+serverUrl+"/"+fileId)
+ }
+ return
}
func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) {
@@ -99,23 +97,6 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
return
}
-func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) {
- vc.RLock()
- defer vc.RUnlock()
-
- locations := vc.vid2Locations[vid]
- if len(locations) == 0 {
- return "", fmt.Errorf("volume %d not found", vid)
- }
-
- index, err := vc.getLocationIndex(len(locations))
- if err != nil {
- return "", fmt.Errorf("volume %d: %v", vid, err)
- }
-
- return locations[index].Url, nil
-}
-
func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()