diff options
| -rw-r--r-- | weed/shell/commands.go | 2 | ||||
| -rw-r--r-- | weed/wdclient/masterclient.go | 125 |
2 files changed, 110 insertions, 17 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 62dcfd7f8..55a09e392 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -116,7 +116,7 @@ func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string { } func (ce *CommandEnv) GetDataCenter() string { - return ce.MasterClient.DataCenter + return ce.MasterClient.GetDataCenter() } func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index f3950bc37..44e24297a 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -35,10 +35,10 @@ type MasterClient struct { masters pb.ServerDiscovery grpcDialOption grpc.DialOption - // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently - // This embedded *vidMap should be changed to a private field protected by sync.RWMutex - // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER] - *vidMap + // vidMap stores volume location mappings + // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap + vidMap *vidMap + vidMapLock sync.RWMutex vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex @@ -71,8 +71,12 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { - // Try cache first using the fast path - fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) + // Try cache first using the fast path - need to protect vidMap pointer access + mc.vidMapLock.RLock() + vm := mc.vidMap + mc.vidMapLock.RUnlock() + + fullUrls, err = vm.LookupFileId(ctx, fileId) if err == nil && len(fullUrls) > 0 { return } @@ -96,10 +100,14 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str } // Build HTTP URLs from locations, preferring same data center + mc.vidMapLock.RLock() + dataCenter := mc.vidMap.DataCenter + mc.vidMapLock.RUnlock() + var sameDcUrls, otherDcUrls []string for _, loc := range locations { httpUrl := "http://" + loc.Url + "/" + fileId - if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { + if dataCenter != "" && dataCenter == loc.DataCenter { sameDcUrls = append(sameDcUrls, httpUrl) } else { otherDcUrls = append(otherDcUrls, httpUrl) @@ -120,20 +128,23 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI // Check cache first and parse volume IDs once vidStringToUint := make(map[string]uint32, len(volumeIds)) + mc.vidMapLock.RLock() for _, vidString := range volumeIds { vid, err := strconv.ParseUint(vidString, 10, 32) if err != nil { + mc.vidMapLock.RUnlock() return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err) } vidStringToUint[vidString] = uint32(vid) - locations, found := mc.GetLocations(uint32(vid)) + locations, found := mc.vidMap.GetLocations(uint32(vid)) if found && len(locations) > 0 { result[vidString] = locations } else { needsLookup = append(needsLookup, vidString) } } + mc.vidMapLock.RUnlock() if len(needsLookup) == 0 { return result, nil @@ -149,14 +160,16 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI stillNeedLookup := make([]string, 0, len(needsLookup)) batchResult := make(map[string][]Location) + mc.vidMapLock.RLock() for _, vidString := range needsLookup { vid := vidStringToUint[vidString] // Use pre-parsed value - if locations, found := mc.GetLocations(vid); found && len(locations) > 0 { + if locations, found := mc.vidMap.GetLocations(vid); found && len(locations) > 0 { batchResult[vidString] = locations } else { stillNeedLookup = append(stillNeedLookup, vidString) } } + mc.vidMapLock.RUnlock() if len(stillNeedLookup) == 0 { return batchResult, nil @@ -196,7 +209,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI GrpcPort: int(masterLoc.GrpcPort), DataCenter: masterLoc.DataCenter, } - mc.vidMap.addLocation(uint32(vid), loc) + mc.addLocation(uint32(vid), loc) locations = append(locations, loc) } @@ -349,9 +362,13 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server return err } + mc.vidMapLock.RLock() + dataCenter := mc.vidMap.DataCenter + mc.vidMapLock.RUnlock() + if err = stream.Send(&master_pb.KeepConnectedRequest{ FilerGroup: mc.FilerGroup, - DataCenter: mc.DataCenter, + DataCenter: dataCenter, Rack: mc.rack, ClientType: mc.clientType, ClientAddress: string(mc.clientHost), @@ -482,15 +499,91 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd }) } +// Public methods for external packages to access vidMap safely + +// GetLocations safely retrieves volume locations +func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + return mc.vidMap.GetLocations(vid) +} + +// GetLocationsClone safely retrieves a clone of volume locations +func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + return mc.vidMap.GetLocationsClone(vid) +} + +// GetVidLocations safely retrieves volume locations by string ID +func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + return mc.vidMap.GetVidLocations(vid) +} + +// LookupFileId safely looks up URLs for a file ID +func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) { + mc.vidMapLock.RLock() + vm := mc.vidMap + mc.vidMapLock.RUnlock() + return vm.LookupFileId(ctx, fileId) +} + +// LookupVolumeServerUrl safely looks up volume server URLs +func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + return mc.vidMap.LookupVolumeServerUrl(vid) +} + +// GetDataCenter safely retrieves the data center +func (mc *MasterClient) GetDataCenter() string { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + return mc.vidMap.DataCenter +} + +// Thread-safe helpers for vidMap operations +// These methods acquire exclusive locks to protect both the vidMap pointer +// and the underlying map mutations from concurrent access + +func (mc *MasterClient) addLocation(vid uint32, location Location) { + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() + mc.vidMap.addLocation(vid, location) +} + +func (mc *MasterClient) deleteLocation(vid uint32, location Location) { + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() + mc.vidMap.deleteLocation(vid, location) +} + +func (mc *MasterClient) addEcLocation(vid uint32, location Location) { + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() + mc.vidMap.addEcLocation(vid, location) +} + +func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) { + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() + mc.vidMap.deleteEcLocation(vid, location) +} + func (mc *MasterClient) resetVidMap() { + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() + tail := &vidMap{ - vid2Locations: mc.vid2Locations, - ecVid2Locations: mc.ecVid2Locations, - DataCenter: mc.DataCenter, - cache: mc.cache, + vid2Locations: mc.vidMap.vid2Locations, + ecVid2Locations: mc.vidMap.ecVid2Locations, + DataCenter: mc.vidMap.DataCenter, + cache: mc.vidMap.cache, } - nvm := newVidMap(mc.DataCenter) + nvm := newVidMap(mc.vidMap.DataCenter) nvm.cache = tail mc.vidMap = nvm |
