aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/commands.go2
-rw-r--r--weed/wdclient/masterclient.go148
-rw-r--r--weed/wdclient/vid_map.go17
3 files changed, 133 insertions, 34 deletions
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index 62dcfd7f8..55a09e392 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -116,7 +116,7 @@ func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
}
func (ce *CommandEnv) GetDataCenter() string {
- return ce.MasterClient.DataCenter
+ return ce.MasterClient.GetDataCenter()
}
func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index f3950bc37..320156294 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -35,10 +35,10 @@ type MasterClient struct {
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
- // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently
- // This embedded *vidMap should be changed to a private field protected by sync.RWMutex
- // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER]
- *vidMap
+ // vidMap stores volume location mappings
+ // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap
+ vidMap *vidMap
+ vidMapLock sync.RWMutex
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
@@ -71,8 +71,13 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
}
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
- // Try cache first using the fast path
- fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId)
+ // Try cache first using the fast path - grab both vidMap and dataCenter in one lock
+ mc.vidMapLock.RLock()
+ vm := mc.vidMap
+ dataCenter := vm.DataCenter
+ mc.vidMapLock.RUnlock()
+
+ fullUrls, err = vm.LookupFileId(ctx, fileId)
if err == nil && len(fullUrls) > 0 {
return
}
@@ -99,7 +104,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str
var sameDcUrls, otherDcUrls []string
for _, loc := range locations {
httpUrl := "http://" + loc.Url + "/" + fileId
- if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter {
+ if dataCenter != "" && dataCenter == loc.DataCenter {
sameDcUrls = append(sameDcUrls, httpUrl)
} else {
otherDcUrls = append(otherDcUrls, httpUrl)
@@ -120,6 +125,10 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
// Check cache first and parse volume IDs once
vidStringToUint := make(map[string]uint32, len(volumeIds))
+
+ // Get stable pointer to vidMap with minimal lock hold time
+ vm := mc.getStableVidMap()
+
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
@@ -127,7 +136,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
}
vidStringToUint[vidString] = uint32(vid)
- locations, found := mc.GetLocations(uint32(vid))
+ locations, found := vm.GetLocations(uint32(vid))
if found && len(locations) > 0 {
result[vidString] = locations
} else {
@@ -149,9 +158,12 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
stillNeedLookup := make([]string, 0, len(needsLookup))
batchResult := make(map[string][]Location)
+ // Get stable pointer with minimal lock hold time
+ vm := mc.getStableVidMap()
+
for _, vidString := range needsLookup {
vid := vidStringToUint[vidString] // Use pre-parsed value
- if locations, found := mc.GetLocations(vid); found && len(locations) > 0 {
+ if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
batchResult[vidString] = locations
} else {
stillNeedLookup = append(stillNeedLookup, vidString)
@@ -196,7 +208,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
- mc.vidMap.addLocation(uint32(vid), loc)
+ mc.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
@@ -351,7 +363,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
if err = stream.Send(&master_pb.KeepConnectedRequest{
FilerGroup: mc.FilerGroup,
- DataCenter: mc.DataCenter,
+ DataCenter: mc.GetDataCenter(),
Rack: mc.rack,
ClientType: mc.clientType,
ClientAddress: string(mc.clientHost),
@@ -482,24 +494,110 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd
})
}
+// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
+// This is safe for read operations as the returned pointer is a stable snapshot,
+// and the underlying vidMap methods have their own internal locking.
+func (mc *MasterClient) getStableVidMap() *vidMap {
+ mc.vidMapLock.RLock()
+ vm := mc.vidMap
+ mc.vidMapLock.RUnlock()
+ return vm
+}
+
+// withCurrentVidMap executes a function with the current vidMap under a read lock.
+// This is for methods that modify vidMap's internal state, ensuring the pointer
+// is not swapped by resetVidMap during the operation. The actual map mutations
+// are protected by vidMap's internal mutex.
+func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) {
+ mc.vidMapLock.RLock()
+ defer mc.vidMapLock.RUnlock()
+ f(mc.vidMap)
+}
+
+// Public methods for external packages to access vidMap safely
+
+// GetLocations safely retrieves volume locations
+func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) {
+ return mc.getStableVidMap().GetLocations(vid)
+}
+
+// GetLocationsClone safely retrieves a clone of volume locations
+func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
+ return mc.getStableVidMap().GetLocationsClone(vid)
+}
+
+// GetVidLocations safely retrieves volume locations by string ID
+func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) {
+ return mc.getStableVidMap().GetVidLocations(vid)
+}
+
+// LookupFileId safely looks up URLs for a file ID
+func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ return mc.getStableVidMap().LookupFileId(ctx, fileId)
+}
+
+// LookupVolumeServerUrl safely looks up volume server URLs
+func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
+ return mc.getStableVidMap().LookupVolumeServerUrl(vid)
+}
+
+// GetDataCenter safely retrieves the data center
+func (mc *MasterClient) GetDataCenter() string {
+ return mc.getStableVidMap().DataCenter
+}
+
+// Thread-safe helpers for vidMap operations
+
+// addLocation adds a volume location
+func (mc *MasterClient) addLocation(vid uint32, location Location) {
+ mc.withCurrentVidMap(func(vm *vidMap) {
+ vm.addLocation(vid, location)
+ })
+}
+
+// deleteLocation removes a volume location
+func (mc *MasterClient) deleteLocation(vid uint32, location Location) {
+ mc.withCurrentVidMap(func(vm *vidMap) {
+ vm.deleteLocation(vid, location)
+ })
+}
+
+// addEcLocation adds an EC volume location
+func (mc *MasterClient) addEcLocation(vid uint32, location Location) {
+ mc.withCurrentVidMap(func(vm *vidMap) {
+ vm.addEcLocation(vid, location)
+ })
+}
+
+// deleteEcLocation removes an EC volume location
+func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) {
+ mc.withCurrentVidMap(func(vm *vidMap) {
+ vm.deleteEcLocation(vid, location)
+ })
+}
+
func (mc *MasterClient) resetVidMap() {
- tail := &vidMap{
- vid2Locations: mc.vid2Locations,
- ecVid2Locations: mc.ecVid2Locations,
- DataCenter: mc.DataCenter,
- cache: mc.cache,
- }
+ mc.vidMapLock.Lock()
+ defer mc.vidMapLock.Unlock()
- nvm := newVidMap(mc.DataCenter)
- nvm.cache = tail
+ // Preserve the existing vidMap in the cache chain
+ // No need to clone - the existing vidMap has its own mutex for thread safety
+ tail := mc.vidMap
+
+ nvm := newVidMap(tail.DataCenter)
+ nvm.cache.Store(tail)
mc.vidMap = nvm
- //trim
- for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ {
- if i == mc.vidMapCacheSize-1 {
- tail.cache = nil
- } else {
- tail = tail.cache
+ // Trim cache chain to vidMapCacheSize by traversing to the last node
+ // that should remain and cutting the chain after it
+ node := tail
+ for i := 0; i < mc.vidMapCacheSize-1; i++ {
+ if node.cache.Load() == nil {
+ return
}
+ node = node.cache.Load()
+ }
+ if node != nil {
+ node.cache.Store(nil)
}
}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 9d5e5d378..179381b0c 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
)
@@ -41,7 +42,7 @@ type vidMap struct {
ecVid2Locations map[uint32][]Location
DataCenter string
cursor int32
- cache *vidMap
+ cache atomic.Pointer[vidMap]
}
func newVidMap(dataCenter string) *vidMap {
@@ -135,8 +136,8 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
return locations, found
}
- if vc.cache != nil {
- return vc.cache.GetLocations(vid)
+ if cachedMap := vc.cache.Load(); cachedMap != nil {
+ return cachedMap.GetLocations(vid)
}
return nil, false
@@ -212,8 +213,8 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) {
}
func (vc *vidMap) deleteLocation(vid uint32, location Location) {
- if vc.cache != nil {
- vc.cache.deleteLocation(vid, location)
+ if cachedMap := vc.cache.Load(); cachedMap != nil {
+ cachedMap.deleteLocation(vid, location)
}
vc.Lock()
@@ -235,8 +236,8 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) {
}
func (vc *vidMap) deleteEcLocation(vid uint32, location Location) {
- if vc.cache != nil {
- vc.cache.deleteLocation(vid, location)
+ if cachedMap := vc.cache.Load(); cachedMap != nil {
+ cachedMap.deleteEcLocation(vid, location)
}
vc.Lock()