diff options
Diffstat (limited to 'weed/wdclient/masterclient.go')
| -rw-r--r-- | weed/wdclient/masterclient.go | 148 |
1 files changed, 123 insertions, 25 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index f3950bc37..320156294 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -35,10 +35,10 @@ type MasterClient struct { masters pb.ServerDiscovery grpcDialOption grpc.DialOption - // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently - // This embedded *vidMap should be changed to a private field protected by sync.RWMutex - // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER] - *vidMap + // vidMap stores volume location mappings + // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap + vidMap *vidMap + vidMapLock sync.RWMutex vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex @@ -71,8 +71,13 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { - // Try cache first using the fast path - fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) + // Try cache first using the fast path - grab both vidMap and dataCenter in one lock + mc.vidMapLock.RLock() + vm := mc.vidMap + dataCenter := vm.DataCenter + mc.vidMapLock.RUnlock() + + fullUrls, err = vm.LookupFileId(ctx, fileId) if err == nil && len(fullUrls) > 0 { return } @@ -99,7 +104,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str var sameDcUrls, otherDcUrls []string for _, loc := range locations { httpUrl := "http://" + loc.Url + "/" + fileId - if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { + if dataCenter != "" && dataCenter == loc.DataCenter { sameDcUrls = append(sameDcUrls, httpUrl) } else { otherDcUrls = append(otherDcUrls, httpUrl) @@ -120,6 +125,10 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI // Check cache first and parse volume IDs once vidStringToUint := make(map[string]uint32, len(volumeIds)) + + // Get stable pointer to vidMap with minimal lock hold time + vm := mc.getStableVidMap() + for _, vidString := range volumeIds { vid, err := strconv.ParseUint(vidString, 10, 32) if err != nil { @@ -127,7 +136,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI } vidStringToUint[vidString] = uint32(vid) - locations, found := mc.GetLocations(uint32(vid)) + locations, found := vm.GetLocations(uint32(vid)) if found && len(locations) > 0 { result[vidString] = locations } else { @@ -149,9 +158,12 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI stillNeedLookup := make([]string, 0, len(needsLookup)) batchResult := make(map[string][]Location) + // Get stable pointer with minimal lock hold time + vm := mc.getStableVidMap() + for _, vidString := range needsLookup { vid := vidStringToUint[vidString] // Use pre-parsed value - if locations, found := mc.GetLocations(vid); found && len(locations) > 0 { + if locations, found := vm.GetLocations(vid); found && len(locations) > 0 { batchResult[vidString] = locations } else { stillNeedLookup = append(stillNeedLookup, vidString) @@ -196,7 +208,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI GrpcPort: int(masterLoc.GrpcPort), DataCenter: masterLoc.DataCenter, } - mc.vidMap.addLocation(uint32(vid), loc) + mc.addLocation(uint32(vid), loc) locations = append(locations, loc) } @@ -351,7 +363,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server if err = stream.Send(&master_pb.KeepConnectedRequest{ FilerGroup: mc.FilerGroup, - DataCenter: mc.DataCenter, + DataCenter: mc.GetDataCenter(), Rack: mc.rack, ClientType: mc.clientType, ClientAddress: string(mc.clientHost), @@ -482,24 +494,110 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd }) } +// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately. +// This is safe for read operations as the returned pointer is a stable snapshot, +// and the underlying vidMap methods have their own internal locking. +func (mc *MasterClient) getStableVidMap() *vidMap { + mc.vidMapLock.RLock() + vm := mc.vidMap + mc.vidMapLock.RUnlock() + return vm +} + +// withCurrentVidMap executes a function with the current vidMap under a read lock. +// This is for methods that modify vidMap's internal state, ensuring the pointer +// is not swapped by resetVidMap during the operation. The actual map mutations +// are protected by vidMap's internal mutex. +func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + f(mc.vidMap) +} + +// Public methods for external packages to access vidMap safely + +// GetLocations safely retrieves volume locations +func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) { + return mc.getStableVidMap().GetLocations(vid) +} + +// GetLocationsClone safely retrieves a clone of volume locations +func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) { + return mc.getStableVidMap().GetLocationsClone(vid) +} + +// GetVidLocations safely retrieves volume locations by string ID +func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) { + return mc.getStableVidMap().GetVidLocations(vid) +} + +// LookupFileId safely looks up URLs for a file ID +func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) { + return mc.getStableVidMap().LookupFileId(ctx, fileId) +} + +// LookupVolumeServerUrl safely looks up volume server URLs +func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) { + return mc.getStableVidMap().LookupVolumeServerUrl(vid) +} + +// GetDataCenter safely retrieves the data center +func (mc *MasterClient) GetDataCenter() string { + return mc.getStableVidMap().DataCenter +} + +// Thread-safe helpers for vidMap operations + +// addLocation adds a volume location +func (mc *MasterClient) addLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.addLocation(vid, location) + }) +} + +// deleteLocation removes a volume location +func (mc *MasterClient) deleteLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteLocation(vid, location) + }) +} + +// addEcLocation adds an EC volume location +func (mc *MasterClient) addEcLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.addEcLocation(vid, location) + }) +} + +// deleteEcLocation removes an EC volume location +func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteEcLocation(vid, location) + }) +} + func (mc *MasterClient) resetVidMap() { - tail := &vidMap{ - vid2Locations: mc.vid2Locations, - ecVid2Locations: mc.ecVid2Locations, - DataCenter: mc.DataCenter, - cache: mc.cache, - } + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() - nvm := newVidMap(mc.DataCenter) - nvm.cache = tail + // Preserve the existing vidMap in the cache chain + // No need to clone - the existing vidMap has its own mutex for thread safety + tail := mc.vidMap + + nvm := newVidMap(tail.DataCenter) + nvm.cache.Store(tail) mc.vidMap = nvm - //trim - for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ { - if i == mc.vidMapCacheSize-1 { - tail.cache = nil - } else { - tail = tail.cache + // Trim cache chain to vidMapCacheSize by traversing to the last node + // that should remain and cutting the chain after it + node := tail + for i := 0; i < mc.vidMapCacheSize-1; i++ { + if node.cache.Load() == nil { + return } + node = node.cache.Load() + } + if node != nil { + node.cache.Store(nil) } } |
