aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/masterclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient/masterclient.go')
-rw-r--r--weed/wdclient/masterclient.go546
1 files changed, 178 insertions, 368 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 320156294..89218a8c7 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -5,328 +5,143 @@ import (
"errors"
"fmt"
"math/rand"
- "sort"
"strconv"
"strings"
"sync"
"time"
- "golang.org/x/sync/singleflight"
-
- "github.com/seaweedfs/seaweedfs/weed/util/version"
-
- "github.com/seaweedfs/seaweedfs/weed/stats"
-
- "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/version"
)
-type MasterClient struct {
- FilerGroup string
- clientType string
- clientHost pb.ServerAddress
- rack string
- currentMaster pb.ServerAddress
- currentMasterLock sync.RWMutex
- masters pb.ServerDiscovery
- grpcDialOption grpc.DialOption
-
- // vidMap stores volume location mappings
- // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap
- vidMap *vidMap
- vidMapLock sync.RWMutex
- vidMapCacheSize int
- OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
- OnPeerUpdateLock sync.RWMutex
-
- // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
- vidLookupGroup singleflight.Group
-}
-
-func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
- return &MasterClient{
- FilerGroup: filerGroup,
- clientType: clientType,
- clientHost: clientHost,
- rack: rack,
- masters: masters,
- grpcDialOption: grpcDialOption,
- vidMap: newVidMap(clientDataCenter),
- vidMapCacheSize: 5,
- }
-}
-
-func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
- mc.OnPeerUpdateLock.Lock()
- mc.OnPeerUpdate = onPeerUpdate
- mc.OnPeerUpdateLock.Unlock()
-}
-
-func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
- return mc.LookupFileIdWithFallback
+// masterVolumeProvider implements VolumeLocationProvider by querying master
+// This is rarely called since master pushes updates proactively via KeepConnected stream
+type masterVolumeProvider struct {
+ masterClient *MasterClient
}
-func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
- // Try cache first using the fast path - grab both vidMap and dataCenter in one lock
- mc.vidMapLock.RLock()
- vm := mc.vidMap
- dataCenter := vm.DataCenter
- mc.vidMapLock.RUnlock()
-
- fullUrls, err = vm.LookupFileId(ctx, fileId)
- if err == nil && len(fullUrls) > 0 {
- return
- }
-
- // Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return nil, fmt.Errorf("invalid fileId %s", fileId)
- }
- volumeId := parts[0]
-
- // Use shared lookup logic with batching and singleflight
- vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
- if err != nil {
- return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
- }
-
- locations, found := vidLocations[volumeId]
- if !found || len(locations) == 0 {
- return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
- }
-
- // Build HTTP URLs from locations, preferring same data center
- var sameDcUrls, otherDcUrls []string
- for _, loc := range locations {
- httpUrl := "http://" + loc.Url + "/" + fileId
- if dataCenter != "" && dataCenter == loc.DataCenter {
- sameDcUrls = append(sameDcUrls, httpUrl)
- } else {
- otherDcUrls = append(otherDcUrls, httpUrl)
- }
- }
-
- // Prefer same data center
- fullUrls = append(sameDcUrls, otherDcUrls...)
- return fullUrls, nil
-}
-
-// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
-// Uses singleflight to coalesce concurrent requests for the same batch of volumes
-func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+// LookupVolumeIds queries the master for volume locations (fallback when cache misses)
+// Returns partial results with aggregated errors for volumes that failed
+func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
- var needsLookup []string
var lookupErrors []error
- // Check cache first and parse volume IDs once
- vidStringToUint := make(map[string]uint32, len(volumeIds))
+ glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds)
- // Get stable pointer to vidMap with minimal lock hold time
- vm := mc.getStableVidMap()
+ // Use a timeout for the master lookup to prevent indefinite blocking
+ timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout)
+ defer cancel()
- for _, vidString := range volumeIds {
- vid, err := strconv.ParseUint(vidString, 10, 32)
+ err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: volumeIds,
+ })
if err != nil {
- return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
- }
- vidStringToUint[vidString] = uint32(vid)
-
- locations, found := vm.GetLocations(uint32(vid))
- if found && len(locations) > 0 {
- result[vidString] = locations
- } else {
- needsLookup = append(needsLookup, vidString)
+ return fmt.Errorf("master lookup failed: %v", err)
}
- }
-
- if len(needsLookup) == 0 {
- return result, nil
- }
- // Batch query all missing volumes using singleflight on the batch key
- // Sort for stable key to coalesce identical batches
- sort.Strings(needsLookup)
- batchKey := strings.Join(needsLookup, ",")
-
- sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
- // Double-check cache for volumes that might have been populated while waiting
- stillNeedLookup := make([]string, 0, len(needsLookup))
- batchResult := make(map[string][]Location)
-
- // Get stable pointer with minimal lock hold time
- vm := mc.getStableVidMap()
-
- for _, vidString := range needsLookup {
- vid := vidStringToUint[vidString] // Use pre-parsed value
- if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
- batchResult[vidString] = locations
- } else {
- stillNeedLookup = append(stillNeedLookup, vidString)
+ for _, vidLoc := range resp.VolumeIdLocations {
+ // Preserve per-volume errors from master response
+ // These could indicate misconfiguration, volume deletion, etc.
+ if vidLoc.Error != "" {
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error))
+ glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
+ continue
}
- }
-
- if len(stillNeedLookup) == 0 {
- return batchResult, nil
- }
-
- // Query master with batched volume IDs
- glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
- err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
- VolumeOrFileIds: stillNeedLookup,
- })
+ // Parse volume ID from response
+ parts := strings.Split(vidLoc.VolumeOrFileId, ",")
+ vidOnly := parts[0]
+ vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
- return fmt.Errorf("master lookup failed: %v", err)
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err))
+ glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
+ continue
}
- for _, vidLoc := range resp.VolumeIdLocations {
- if vidLoc.Error != "" {
- glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
- continue
- }
-
- // Parse volume ID from response
- parts := strings.Split(vidLoc.VolumeOrFileId, ",")
- vidOnly := parts[0]
- vid, err := strconv.ParseUint(vidOnly, 10, 32)
- if err != nil {
- glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
- continue
- }
-
- var locations []Location
- for _, masterLoc := range vidLoc.Locations {
- loc := Location{
- Url: masterLoc.Url,
- PublicUrl: masterLoc.PublicUrl,
- GrpcPort: int(masterLoc.GrpcPort),
- DataCenter: masterLoc.DataCenter,
- }
- mc.addLocation(uint32(vid), loc)
- locations = append(locations, loc)
- }
-
- if len(locations) > 0 {
- batchResult[vidOnly] = locations
+ var locations []Location
+ for _, masterLoc := range vidLoc.Locations {
+ loc := Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: int(masterLoc.GrpcPort),
+ DataCenter: masterLoc.DataCenter,
}
+ // Update cache with the location
+ p.masterClient.addLocation(uint32(vid), loc)
+ locations = append(locations, loc)
}
- return nil
- })
- if err != nil {
- return batchResult, err
+ if len(locations) > 0 {
+ result[vidOnly] = locations
+ }
}
- return batchResult, nil
+ return nil
})
if err != nil {
- lookupErrors = append(lookupErrors, err)
+ return nil, err
}
- // Merge singleflight batch results
- if batchLocations, ok := sfResult.(map[string][]Location); ok {
- for vid, locs := range batchLocations {
- result[vid] = locs
- }
- }
-
- // Check for volumes that still weren't found
- for _, vidString := range needsLookup {
- if _, found := result[vidString]; !found {
- lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
- }
+ // Return partial results with detailed errors
+ // Callers should check both result map and error
+ if len(lookupErrors) > 0 {
+ glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors))
+ return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...))
}
- // Return aggregated errors using errors.Join to preserve error types
- return result, errors.Join(lookupErrors...)
+ glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result))
+ return result, nil
}
-func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
- mc.currentMasterLock.RLock()
- defer mc.currentMasterLock.RUnlock()
- return mc.currentMaster
-}
+// MasterClient connects to master servers and maintains volume location cache
+// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching
+type MasterClient struct {
+ *vidMapClient // Embedded cache with shared logic
-func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
- mc.currentMasterLock.Lock()
- mc.currentMaster = master
- mc.currentMasterLock.Unlock()
+ FilerGroup string
+ clientType string
+ clientHost pb.ServerAddress
+ rack string
+ currentMaster pb.ServerAddress
+ currentMasterLock sync.RWMutex
+ masters pb.ServerDiscovery
+ grpcDialOption grpc.DialOption
+ grpcTimeout time.Duration // Timeout for gRPC calls to master
+ OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
+ OnPeerUpdateLock sync.RWMutex
}
-func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
- mc.WaitUntilConnected(ctx)
- return mc.getCurrentMaster()
-}
+func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
+ mc := &MasterClient{
+ FilerGroup: filerGroup,
+ clientType: clientType,
+ clientHost: clientHost,
+ rack: rack,
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master
+ }
-func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
- mc.WaitUntilConnected(ctx)
- return mc.masters.GetInstances()
-}
+ // Create provider that references this MasterClient
+ provider := &masterVolumeProvider{masterClient: mc}
-func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
- attempts := 0
- for {
- select {
- case <-ctx.Done():
- return
- default:
- currentMaster := mc.getCurrentMaster()
- if currentMaster != "" {
- return
- }
- attempts++
- if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
- glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
- }
- time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
- }
- }
-}
+ // Initialize embedded vidMapClient with the provider and default cache size
+ mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize)
-func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
- glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
- for {
- select {
- case <-ctx.Done():
- glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
- return
- default:
- mc.tryAllMasters(ctx)
- time.Sleep(time.Second)
- }
- }
+ return mc
}
-func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
- for _, master := range mc.masters.GetInstances() {
- if master == myMasterAddress {
- continue
- }
- if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
- defer cancel()
- resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return err
- }
- leader = resp.Leader
- return nil
- }); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", master, grpcErr)
- }
- if leader != "" {
- glog.V(0).Infof("existing leader is %s", leader)
- return
- }
- }
- glog.V(0).Infof("No existing leader found!")
- return
+func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
+ mc.OnPeerUpdateLock.Lock()
+ mc.OnPeerUpdate = onPeerUpdate
+ mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
@@ -393,6 +208,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
mc.resetVidMap()
mc.updateVidMap(resp)
} else {
+ // First message from master is not VolumeLocation (e.g., ClusterNodeUpdate)
+ // Still need to reset cache to ensure we don't use stale data from previous master
mc.resetVidMap()
}
mc.setCurrentMaster(master)
@@ -406,7 +223,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
if resp.VolumeLocation != nil {
- // maybe the leader is changed
+ // Check for leader change during the stream
+ // If master announces a new leader, reconnect to it
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
@@ -415,7 +233,6 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
mc.updateVidMap(resp)
}
-
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
mc.OnPeerUpdateLock.RLock()
@@ -442,7 +259,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
- return
+ return nextHintedLeader
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
@@ -494,110 +311,103 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd
})
}
-// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
-// This is safe for read operations as the returned pointer is a stable snapshot,
-// and the underlying vidMap methods have their own internal locking.
-func (mc *MasterClient) getStableVidMap() *vidMap {
- mc.vidMapLock.RLock()
- vm := mc.vidMap
- mc.vidMapLock.RUnlock()
- return vm
-}
-
-// withCurrentVidMap executes a function with the current vidMap under a read lock.
-// This is for methods that modify vidMap's internal state, ensuring the pointer
-// is not swapped by resetVidMap during the operation. The actual map mutations
-// are protected by vidMap's internal mutex.
-func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) {
- mc.vidMapLock.RLock()
- defer mc.vidMapLock.RUnlock()
- f(mc.vidMap)
-}
-
-// Public methods for external packages to access vidMap safely
-
-// GetLocations safely retrieves volume locations
-func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) {
- return mc.getStableVidMap().GetLocations(vid)
-}
-
-// GetLocationsClone safely retrieves a clone of volume locations
-func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
- return mc.getStableVidMap().GetLocationsClone(vid)
-}
-
-// GetVidLocations safely retrieves volume locations by string ID
-func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) {
- return mc.getStableVidMap().GetVidLocations(vid)
-}
-
-// LookupFileId safely looks up URLs for a file ID
-func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
- return mc.getStableVidMap().LookupFileId(ctx, fileId)
-}
-
-// LookupVolumeServerUrl safely looks up volume server URLs
-func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
- return mc.getStableVidMap().LookupVolumeServerUrl(vid)
+func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
+ mc.currentMasterLock.RLock()
+ defer mc.currentMasterLock.RUnlock()
+ return mc.currentMaster
}
-// GetDataCenter safely retrieves the data center
-func (mc *MasterClient) GetDataCenter() string {
- return mc.getStableVidMap().DataCenter
+func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
+ mc.currentMasterLock.Lock()
+ mc.currentMaster = master
+ mc.currentMasterLock.Unlock()
}
-// Thread-safe helpers for vidMap operations
-
-// addLocation adds a volume location
-func (mc *MasterClient) addLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.addLocation(vid, location)
- })
+// GetMaster returns the current master address, blocking until connected.
+//
+// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes
+// a connection to a master server. If KeepConnectedToMaster hasn't been started in a
+// background goroutine, this will block indefinitely (or until ctx is canceled).
+//
+// Typical initialization pattern:
+//
+// mc := wdclient.NewMasterClient(...)
+// go mc.KeepConnectedToMaster(ctx) // Start connection management
+// // ... later ...
+// master := mc.GetMaster(ctx) // Will block until connected
+//
+// If called before KeepConnectedToMaster establishes a connection, this may cause
+// unexpected timeouts in LookupVolumeIds and other operations that depend on it.
+func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
+ return mc.getCurrentMaster()
}
-// deleteLocation removes a volume location
-func (mc *MasterClient) deleteLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.deleteLocation(vid, location)
- })
+// GetMasters returns all configured master addresses, blocking until connected.
+// See GetMaster() for important initialization contract details.
+func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
+ return mc.masters.GetInstances()
}
-// addEcLocation adds an EC volume location
-func (mc *MasterClient) addEcLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.addEcLocation(vid, location)
- })
+// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
+// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
+func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
+ attempts := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ currentMaster := mc.getCurrentMaster()
+ if currentMaster != "" {
+ return
+ }
+ attempts++
+ if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
+ glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
+ }
+ time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
+ }
+ }
}
-// deleteEcLocation removes an EC volume location
-func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.deleteEcLocation(vid, location)
- })
+func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
+ glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
+ for {
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
+ return
+ default:
+ mc.tryAllMasters(ctx)
+ time.Sleep(time.Second)
+ }
+ }
}
-func (mc *MasterClient) resetVidMap() {
- mc.vidMapLock.Lock()
- defer mc.vidMapLock.Unlock()
-
- // Preserve the existing vidMap in the cache chain
- // No need to clone - the existing vidMap has its own mutex for thread safety
- tail := mc.vidMap
-
- nvm := newVidMap(tail.DataCenter)
- nvm.cache.Store(tail)
- mc.vidMap = nvm
-
- // Trim cache chain to vidMapCacheSize by traversing to the last node
- // that should remain and cutting the chain after it
- node := tail
- for i := 0; i < mc.vidMapCacheSize-1; i++ {
- if node.cache.Load() == nil {
+func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
+ for _, master := range mc.masters.GetInstances() {
+ if master == myMasterAddress {
+ continue
+ }
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
+ defer cancel()
+ resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ leader = resp.Leader
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", master, grpcErr)
+ }
+ if leader != "" {
+ glog.V(0).Infof("existing leader is %s", leader)
return
}
- node = node.cache.Load()
- }
- if node != nil {
- node.cache.Store(nil)
}
+ glog.V(0).Infof("No existing leader found!")
+ return
}