aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-10-30 18:05:49 -0700
committerchrislu <chris.lu@gmail.com>2025-10-30 18:05:49 -0700
commit0667c4964e7890f790c2150732b5bfd5fe969c24 (patch)
tree2ba2b3348aaf4b6eb962c7174c8c82b8f072afbe
parent5737f77d5f041f2f8e00f350ca6d3339836850ac (diff)
downloadseaweedfs-0667c4964e7890f790c2150732b5bfd5fe969c24.tar.xz
seaweedfs-0667c4964e7890f790c2150732b5bfd5fe969c24.zip
avoid dup lookup
-rw-r--r--weed/wdclient/masterclient.go133
1 files changed, 87 insertions, 46 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 7b789eceb..db5ebee10 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -9,6 +9,8 @@ import (
"sync"
"time"
+ "golang.org/x/sync/singleflight"
+
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -35,18 +37,23 @@ type MasterClient struct {
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
+
+ // Per-volume-ID in-flight tracking to prevent duplicate lookups
+ vidLookupLock sync.Mutex
+ vidLookupInFlight map[string]*singleflight.Group // volumeId -> singleflight group
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
return &MasterClient{
- FilerGroup: filerGroup,
- clientType: clientType,
- clientHost: clientHost,
- rack: rack,
- masters: masters,
- grpcDialOption: grpcDialOption,
- vidMap: newVidMap(clientDataCenter),
- vidMapCacheSize: 5,
+ FilerGroup: filerGroup,
+ clientType: clientType,
+ clientHost: clientHost,
+ rack: rack,
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ vidMap: newVidMap(clientDataCenter),
+ vidMapCacheSize: 5,
+ vidLookupInFlight: make(map[string]*singleflight.Group),
}
}
@@ -97,11 +104,12 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str
}
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
+// Uses per-volume-ID singleflight to prevent duplicate lookups of the same volume
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
- var missingVids []string
+ var needsLookup []string
- // Check cache first
+ // Check cache first and separate volumes that need lookup
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
@@ -111,58 +119,91 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
if found && len(locations) > 0 {
result[vidString] = locations
} else {
- missingVids = append(missingVids, vidString)
+ needsLookup = append(needsLookup, vidString)
}
}
- // Query master for missing volumes
- if len(missingVids) > 0 {
- glog.V(2).Infof("Looking up %d volumes from master: %v", len(missingVids), missingVids)
- err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
- VolumeOrFileIds: missingVids,
- })
- if err != nil {
- return fmt.Errorf("master lookup failed: %v", err)
+ if len(needsLookup) == 0 {
+ return result, nil
+ }
+
+ // For each volume that needs lookup, use per-volume singleflight
+ // to prevent duplicate master queries for the same volume ID
+ for _, vidString := range needsLookup {
+ // Get or create singleflight group for this volume ID
+ mc.vidLookupLock.Lock()
+ group, exists := mc.vidLookupInFlight[vidString]
+ if !exists {
+ group = &singleflight.Group{}
+ mc.vidLookupInFlight[vidString] = group
+ }
+ mc.vidLookupLock.Unlock()
+
+ // Use singleflight to ensure only one lookup per volume ID
+ sfResult, err, _ := group.Do(vidString, func() (interface{}, error) {
+ // Double-check cache in case it was populated while we were waiting
+ vid, _ := strconv.ParseUint(vidString, 10, 32)
+ if locations, found := mc.GetLocations(uint32(vid)); found && len(locations) > 0 {
+ return locations, nil
}
- for _, vidLoc := range resp.VolumeIdLocations {
- if vidLoc.Error != "" {
- glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
- continue
- }
+ // Query master for this volume
+ glog.V(2).Infof("Looking up volume %s from master", vidString)
+ var locations []Location
- vidString := vidLoc.VolumeOrFileId
- // Parse volume ID from response (could be "123" or "123,abc")
- parts := strings.Split(vidString, ",")
- vidOnly := parts[0]
- vid, err := strconv.ParseUint(vidOnly, 10, 32)
+ err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: []string{vidString},
+ })
if err != nil {
- glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidString, err)
- continue
+ return fmt.Errorf("master lookup failed: %v", err)
}
- var locations []Location
- for _, masterLoc := range vidLoc.Locations {
- loc := Location{
- Url: masterLoc.Url,
- PublicUrl: masterLoc.PublicUrl,
- GrpcPort: int(masterLoc.GrpcPort),
- DataCenter: masterLoc.DataCenter,
+ for _, vidLoc := range resp.VolumeIdLocations {
+ if vidLoc.Error != "" {
+ return fmt.Errorf("volume %s lookup error: %s", vidString, vidLoc.Error)
+ }
+
+ // Parse volume ID from response
+ parts := strings.Split(vidLoc.VolumeOrFileId, ",")
+ vidOnly := parts[0]
+ vid, err := strconv.ParseUint(vidOnly, 10, 32)
+ if err != nil {
+ return fmt.Errorf("failed to parse volume id '%s': %v", vidOnly, err)
}
- mc.vidMap.addLocation(uint32(vid), loc)
- locations = append(locations, loc)
- }
- if len(locations) > 0 {
- result[vidOnly] = locations
+ for _, masterLoc := range vidLoc.Locations {
+ loc := Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: int(masterLoc.GrpcPort),
+ DataCenter: masterLoc.DataCenter,
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ locations = append(locations, loc)
+ }
}
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
}
- return nil
+ return locations, nil
})
+ // Clean up the singleflight group for this volume
+ mc.vidLookupLock.Lock()
+ delete(mc.vidLookupInFlight, vidString)
+ mc.vidLookupLock.Unlock()
+
if err != nil {
- return result, err
+ glog.Warningf("Failed to lookup volume %s: %v", vidString, err)
+ continue // Continue with other volumes
+ }
+
+ if locations, ok := sfResult.([]Location); ok && len(locations) > 0 {
+ result[vidString] = locations
}
}