aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/masterclient.go
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/wdclient/masterclient.go
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/wdclient/masterclient.go')
-rw-r--r--weed/wdclient/masterclient.go176
1 files changed, 133 insertions, 43 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index d76ae31e2..def4a9a71 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -2,6 +2,7 @@ package wdclient
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"math/rand"
"time"
@@ -14,32 +15,69 @@ import (
)
type MasterClient struct {
+ FilerGroup string
clientType string
- clientHost string
- grpcPort uint32
- currentMaster string
- masters []string
+ clientHost pb.ServerAddress
+ currentMaster pb.ServerAddress
+ masters map[string]pb.ServerAddress
grpcDialOption grpc.DialOption
vidMap
+
+ OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
}
-func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost string, clientGrpcPort uint32, clientDataCenter string, masters []string) *MasterClient {
+func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient {
return &MasterClient{
+ FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
- grpcPort: clientGrpcPort,
masters: masters,
grpcDialOption: grpcDialOption,
vidMap: newVidMap(clientDataCenter),
}
}
-func (mc *MasterClient) GetMaster() string {
+func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return mc.LookupFileIdWithFallback
+}
+
+func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []string, err error) {
+ fullUrls, err = mc.vidMap.LookupFileId(fileId)
+ err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: []string{fileId},
+ })
+ if err != nil {
+ return err
+ }
+ for vid, vidLocation := range resp.VolumeIdLocations {
+ for _, vidLoc := range vidLocation.Locations {
+ loc := Location{
+ Url: vidLoc.Url,
+ PublicUrl: vidLoc.PublicUrl,
+ GrpcPort: int(vidLoc.GrpcPort),
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ fullUrls = append(fullUrls, loc.Url)
+ }
+ }
+
+ return nil
+ })
+ return
+}
+
+func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
return mc.currentMaster
}
+func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
+ mc.WaitUntilConnected()
+ return mc.masters
+}
+
func (mc *MasterClient) WaitUntilConnected() {
for mc.currentMaster == "" {
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
@@ -47,19 +85,19 @@ func (mc *MasterClient) WaitUntilConnected() {
}
func (mc *MasterClient) KeepConnectedToMaster() {
- glog.V(1).Infof("%s masterClient bootstraps with masters %v", mc.clientType, mc.masters)
+ glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
for {
mc.tryAllMasters()
time.Sleep(time.Second)
}
}
-func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress string) (leader string) {
+func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
for _, master := range mc.masters {
if master == myMasterAddress {
continue
}
- if grpcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
@@ -81,7 +119,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress string) (leader
}
func (mc *MasterClient) tryAllMasters() {
- nextHintedLeader := ""
+ var nextHintedLeader pb.ServerAddress
for _, master := range mc.masters {
nextHintedLeader = mc.tryConnectToMaster(master)
@@ -90,74 +128,126 @@ func (mc *MasterClient) tryAllMasters() {
}
mc.currentMaster = ""
- mc.vidMap = newVidMap("")
}
}
-func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader string) {
- glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
- gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
-
+func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
+ glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
+ stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
+ gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.KeepConnected(ctx)
if err != nil {
- glog.V(1).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
+ glog.V(1).Infof("%s.%s masterClient failed to keep connected to %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToKeepConnected).Inc()
return err
}
- if err = stream.Send(&master_pb.KeepConnectedRequest{Name: mc.clientType, GrpcPort: mc.grpcPort}); err != nil {
- glog.V(0).Infof("%s masterClient failed to send to %s: %v", mc.clientType, master, err)
+ if err = stream.Send(&master_pb.KeepConnectedRequest{
+ FilerGroup: mc.FilerGroup,
+ ClientType: mc.clientType,
+ ClientAddress: string(mc.clientHost),
+ Version: util.Version(),
+ }); err != nil {
+ glog.V(0).Infof("%s.%s masterClient failed to send to %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToSend).Inc()
+ return err
+ }
+ glog.V(1).Infof("%s.%s masterClient Connected to %v", mc.FilerGroup, mc.clientType, master)
+
+ resp, err := stream.Recv()
+ if err != nil {
+ glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
return err
}
- glog.V(1).Infof("%s masterClient Connected to %v", mc.clientType, master)
+ // check if it is the leader to determine whether to reset the vidMap
+ if resp.VolumeLocation != nil && resp.VolumeLocation.Leader != "" {
+ glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader)
+ nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
+ return nil
+ }
+
mc.currentMaster = master
+ mc.vidMap = newVidMap("")
for {
- volumeLocation, err := stream.Recv()
+ resp, err := stream.Recv()
if err != nil {
- glog.V(0).Infof("%s masterClient failed to receive from %s: %v", mc.clientType, master, err)
+ glog.V(0).Infof("%s.%s masterClient failed to receive from %s: %v", mc.FilerGroup, mc.clientType, master, err)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.FailedToReceive).Inc()
return err
}
- // maybe the leader is changed
- if volumeLocation.Leader != "" {
- glog.V(0).Infof("redirected to leader %v", volumeLocation.Leader)
- nextHintedLeader = volumeLocation.Leader
- return nil
+ if resp.VolumeLocation != nil {
+ // maybe the leader is changed
+ if resp.VolumeLocation.Leader != "" {
+ glog.V(0).Infof("redirected to leader %v", resp.VolumeLocation.Leader)
+ nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToleader).Inc()
+ return nil
+ }
+
+ // process new volume location
+ loc := Location{
+ Url: resp.VolumeLocation.Url,
+ PublicUrl: resp.VolumeLocation.PublicUrl,
+ DataCenter: resp.VolumeLocation.DataCenter,
+ GrpcPort: int(resp.VolumeLocation.GrpcPort),
+ }
+ for _, newVid := range resp.VolumeLocation.NewVids {
+ glog.V(1).Infof("%s.%s: %s masterClient adds volume %d", mc.FilerGroup, mc.clientType, loc.Url, newVid)
+ mc.addLocation(newVid, loc)
+ }
+ for _, deletedVid := range resp.VolumeLocation.DeletedVids {
+ glog.V(1).Infof("%s.%s: %s masterClient removes volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedVid)
+ mc.deleteLocation(deletedVid, loc)
+ }
+ for _, newEcVid := range resp.VolumeLocation.NewEcVids {
+ glog.V(1).Infof("%s.%s: %s masterClient adds ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, newEcVid)
+ mc.addEcLocation(newEcVid, loc)
+ }
+ for _, deletedEcVid := range resp.VolumeLocation.DeletedEcVids {
+ glog.V(1).Infof("%s.%s: %s masterClient removes ec volume %d", mc.FilerGroup, mc.clientType, loc.Url, deletedEcVid)
+ mc.deleteEcLocation(deletedEcVid, loc)
+ }
}
- // process new volume location
- loc := Location{
- Url: volumeLocation.Url,
- PublicUrl: volumeLocation.PublicUrl,
- DataCenter: volumeLocation.DataCenter,
- }
- for _, newVid := range volumeLocation.NewVids {
- glog.V(1).Infof("%s: %s masterClient adds volume %d", mc.clientType, loc.Url, newVid)
- mc.addLocation(newVid, loc)
- }
- for _, deletedVid := range volumeLocation.DeletedVids {
- glog.V(1).Infof("%s: %s masterClient removes volume %d", mc.clientType, loc.Url, deletedVid)
- mc.deleteLocation(deletedVid, loc)
+ if resp.ClusterNodeUpdate != nil {
+ update := resp.ClusterNodeUpdate
+ if mc.OnPeerUpdate != nil {
+ if update.FilerGroup == mc.FilerGroup {
+ if update.IsAdd {
+ glog.V(0).Infof("+ %s.%s %s leader:%v\n", update.FilerGroup, update.NodeType, update.Address, update.IsLeader)
+ } else {
+ glog.V(0).Infof("- %s.%s %s leader:%v\n", update.FilerGroup, update.NodeType, update.Address, update.IsLeader)
+ }
+ stats.MasterClientConnectCounter.WithLabelValues(stats.OnPeerUpdate).Inc()
+ mc.OnPeerUpdate(update, time.Now())
+ }
+ }
}
+
}
})
if gprcErr != nil {
- glog.V(1).Infof("%s masterClient failed to connect with master %v: %v", mc.clientType, master, gprcErr)
+ stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
+ glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
return
}
-func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
+func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
for mc.currentMaster == "" {
time.Sleep(3 * time.Second)
}
- return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})