diff options
| author | chrislu <chris.lu@gmail.com> | 2025-10-30 18:05:49 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-10-30 18:05:49 -0700 |
| commit | 0667c4964e7890f790c2150732b5bfd5fe969c24 (patch) | |
| tree | 2ba2b3348aaf4b6eb962c7174c8c82b8f072afbe | |
| parent | 5737f77d5f041f2f8e00f350ca6d3339836850ac (diff) | |
| download | seaweedfs-0667c4964e7890f790c2150732b5bfd5fe969c24.tar.xz seaweedfs-0667c4964e7890f790c2150732b5bfd5fe969c24.zip | |
avoid dup lookup
| -rw-r--r-- | weed/wdclient/masterclient.go | 133 |
1 files changed, 87 insertions, 46 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 7b789eceb..db5ebee10 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" + "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -35,18 +37,23 @@ type MasterClient struct { vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex + + // Per-volume-ID in-flight tracking to prevent duplicate lookups + vidLookupLock sync.Mutex + vidLookupInFlight map[string]*singleflight.Group // volumeId -> singleflight group } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { return &MasterClient{ - FilerGroup: filerGroup, - clientType: clientType, - clientHost: clientHost, - rack: rack, - masters: masters, - grpcDialOption: grpcDialOption, - vidMap: newVidMap(clientDataCenter), - vidMapCacheSize: 5, + FilerGroup: filerGroup, + clientType: clientType, + clientHost: clientHost, + rack: rack, + masters: masters, + grpcDialOption: grpcDialOption, + vidMap: newVidMap(clientDataCenter), + vidMapCacheSize: 5, + vidLookupInFlight: make(map[string]*singleflight.Group), } } @@ -97,11 +104,12 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str } // LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache +// Uses per-volume-ID singleflight to prevent duplicate lookups of the same volume func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { result := make(map[string][]Location) - var missingVids []string + var needsLookup []string - // Check cache first + // Check cache first and separate volumes that need lookup for _, vidString := range volumeIds { vid, err := strconv.ParseUint(vidString, 10, 32) if err != nil { @@ -111,58 +119,91 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI if found && len(locations) > 0 { result[vidString] = locations } else { - missingVids = append(missingVids, vidString) + needsLookup = append(needsLookup, vidString) } } - // Query master for missing volumes - if len(missingVids) > 0 { - glog.V(2).Infof("Looking up %d volumes from master: %v", len(missingVids), missingVids) - err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: missingVids, - }) - if err != nil { - return fmt.Errorf("master lookup failed: %v", err) + if len(needsLookup) == 0 { + return result, nil + } + + // For each volume that needs lookup, use per-volume singleflight + // to prevent duplicate master queries for the same volume ID + for _, vidString := range needsLookup { + // Get or create singleflight group for this volume ID + mc.vidLookupLock.Lock() + group, exists := mc.vidLookupInFlight[vidString] + if !exists { + group = &singleflight.Group{} + mc.vidLookupInFlight[vidString] = group + } + mc.vidLookupLock.Unlock() + + // Use singleflight to ensure only one lookup per volume ID + sfResult, err, _ := group.Do(vidString, func() (interface{}, error) { + // Double-check cache in case it was populated while we were waiting + vid, _ := strconv.ParseUint(vidString, 10, 32) + if locations, found := mc.GetLocations(uint32(vid)); found && len(locations) > 0 { + return locations, nil } - for _, vidLoc := range resp.VolumeIdLocations { - if vidLoc.Error != "" { - glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) - continue - } + // Query master for this volume + glog.V(2).Infof("Looking up volume %s from master", vidString) + var locations []Location - vidString := vidLoc.VolumeOrFileId - // Parse volume ID from response (could be "123" or "123,abc") - parts := strings.Split(vidString, ",") - vidOnly := parts[0] - vid, err := strconv.ParseUint(vidOnly, 10, 32) + err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: []string{vidString}, + }) if err != nil { - glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidString, err) - continue + return fmt.Errorf("master lookup failed: %v", err) } - var locations []Location - for _, masterLoc := range vidLoc.Locations { - loc := Location{ - Url: masterLoc.Url, - PublicUrl: masterLoc.PublicUrl, - GrpcPort: int(masterLoc.GrpcPort), - DataCenter: masterLoc.DataCenter, + for _, vidLoc := range resp.VolumeIdLocations { + if vidLoc.Error != "" { + return fmt.Errorf("volume %s lookup error: %s", vidString, vidLoc.Error) + } + + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) + if err != nil { + return fmt.Errorf("failed to parse volume id '%s': %v", vidOnly, err) } - mc.vidMap.addLocation(uint32(vid), loc) - locations = append(locations, loc) - } - if len(locations) > 0 { - result[vidOnly] = locations + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, + } + mc.vidMap.addLocation(uint32(vid), loc) + locations = append(locations, loc) + } } + return nil + }) + + if err != nil { + return nil, err } - return nil + return locations, nil }) + // Clean up the singleflight group for this volume + mc.vidLookupLock.Lock() + delete(mc.vidLookupInFlight, vidString) + mc.vidLookupLock.Unlock() + if err != nil { - return result, err + glog.Warningf("Failed to lookup volume %s: %v", vidString, err) + continue // Continue with other volumes + } + + if locations, ok := sfResult.([]Location); ok && len(locations) > 0 { + result[vidString] = locations } } |
