aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-10-30 20:18:21 -0700
committerGitHub <noreply@github.com>2025-10-30 20:18:21 -0700
commit5810aba763cc269440483d12dcc6e026ec010fe6 (patch)
tree4c16da32daa9fb34f4b9b171f0d4230308b23c00
parentba07b3e4c68f34219f3ca10f8f3ddcc059ffb972 (diff)
downloadseaweedfs-5810aba763cc269440483d12dcc6e026ec010fe6.tar.xz
seaweedfs-5810aba763cc269440483d12dcc6e026ec010fe6.zip
Filer: fallback to check master (#7411)
* fallback to check master * clean up * parsing * refactor * handle parse error * return error * avoid dup lookup * use batch key * dedup lookup logic * address comments * errors.Join(lookupErrors...) * add a comment
-rw-r--r--weed/server/filer_grpc_server.go44
-rw-r--r--weed/wdclient/masterclient.go183
2 files changed, 184 insertions, 43 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a18c55bb1..02eceebde 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
- "strconv"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -17,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
@@ -94,31 +94,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
LocationsMap: make(map[string]*filer_pb.Locations),
}
- for _, vidString := range req.VolumeIds {
- vid, err := strconv.Atoi(vidString)
- if err != nil {
- glog.V(1).InfofCtx(ctx, "Unknown volume id %d", vid)
- return nil, err
- }
- var locs []*filer_pb.Location
- locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
- if !found {
- continue
- }
- for _, loc := range locations {
- locs = append(locs, &filer_pb.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- GrpcPort: uint32(loc.GrpcPort),
- DataCenter: loc.DataCenter,
- })
- }
+ // Use master client's lookup with fallback - it handles cache and master query
+ vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
+
+ // Convert wdclient.Location to filer_pb.Location
+ // Return partial results even if there was an error
+ for vidString, locations := range vidLocations {
resp.LocationsMap[vidString] = &filer_pb.Locations{
- Locations: locs,
+ Locations: wdclientLocationsToPb(locations),
}
}
- return resp, nil
+ return resp, err
+}
+
+func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
+ locs := make([]*filer_pb.Location, 0, len(locations))
+ for _, loc := range locations {
+ locs = append(locs, &filer_pb.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ GrpcPort: uint32(loc.GrpcPort),
+ DataCenter: loc.DataCenter,
+ })
+ }
+ return locs
}
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 11b58d861..f3950bc37 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -2,11 +2,17 @@ package wdclient
import (
"context"
+ "errors"
"fmt"
"math/rand"
+ "sort"
+ "strconv"
+ "strings"
"sync"
"time"
+ "golang.org/x/sync/singleflight"
+
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -29,10 +35,16 @@ type MasterClient struct {
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
+ // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently
+ // This embedded *vidMap should be changed to a private field protected by sync.RWMutex
+ // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER]
*vidMap
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
+
+ // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
+ vidLookupGroup singleflight.Group
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
@@ -59,39 +71,168 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
}
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ // Try cache first using the fast path
fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId)
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
- VolumeOrFileIds: []string{fileId},
- })
+ // Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid fileId %s", fileId)
+ }
+ volumeId := parts[0]
+
+ // Use shared lookup logic with batching and singleflight
+ vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
+ if err != nil {
+ return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
+ }
+
+ locations, found := vidLocations[volumeId]
+ if !found || len(locations) == 0 {
+ return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
+ }
+
+ // Build HTTP URLs from locations, preferring same data center
+ var sameDcUrls, otherDcUrls []string
+ for _, loc := range locations {
+ httpUrl := "http://" + loc.Url + "/" + fileId
+ if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter {
+ sameDcUrls = append(sameDcUrls, httpUrl)
+ } else {
+ otherDcUrls = append(otherDcUrls, httpUrl)
+ }
+ }
+
+ // Prefer same data center
+ fullUrls = append(sameDcUrls, otherDcUrls...)
+ return fullUrls, nil
+}
+
+// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
+// Uses singleflight to coalesce concurrent requests for the same batch of volumes
+func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ result := make(map[string][]Location)
+ var needsLookup []string
+ var lookupErrors []error
+
+ // Check cache first and parse volume IDs once
+ vidStringToUint := make(map[string]uint32, len(volumeIds))
+ for _, vidString := range volumeIds {
+ vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
- return fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
+ return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
+ }
+ vidStringToUint[vidString] = uint32(vid)
+
+ locations, found := mc.GetLocations(uint32(vid))
+ if found && len(locations) > 0 {
+ result[vidString] = locations
+ } else {
+ needsLookup = append(needsLookup, vidString)
+ }
+ }
+
+ if len(needsLookup) == 0 {
+ return result, nil
+ }
+
+ // Batch query all missing volumes using singleflight on the batch key
+ // Sort for stable key to coalesce identical batches
+ sort.Strings(needsLookup)
+ batchKey := strings.Join(needsLookup, ",")
+
+ sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
+ // Double-check cache for volumes that might have been populated while waiting
+ stillNeedLookup := make([]string, 0, len(needsLookup))
+ batchResult := make(map[string][]Location)
+
+ for _, vidString := range needsLookup {
+ vid := vidStringToUint[vidString] // Use pre-parsed value
+ if locations, found := mc.GetLocations(vid); found && len(locations) > 0 {
+ batchResult[vidString] = locations
+ } else {
+ stillNeedLookup = append(stillNeedLookup, vidString)
+ }
+ }
+
+ if len(stillNeedLookup) == 0 {
+ return batchResult, nil
}
- for vid, vidLocation := range resp.VolumeIdLocations {
- for _, vidLoc := range vidLocation.Locations {
- loc := Location{
- Url: vidLoc.Url,
- PublicUrl: vidLoc.PublicUrl,
- GrpcPort: int(vidLoc.GrpcPort),
- DataCenter: vidLoc.DataCenter,
+
+ // Query master with batched volume IDs
+ glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
+
+ err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: stillNeedLookup,
+ })
+ if err != nil {
+ return fmt.Errorf("master lookup failed: %v", err)
+ }
+
+ for _, vidLoc := range resp.VolumeIdLocations {
+ if vidLoc.Error != "" {
+ glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
+ continue
}
- mc.vidMap.addLocation(uint32(vid), loc)
- httpUrl := "http://" + loc.Url + "/" + fileId
- // Prefer same data center
- if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter {
- fullUrls = append([]string{httpUrl}, fullUrls...)
- } else {
- fullUrls = append(fullUrls, httpUrl)
+
+ // Parse volume ID from response
+ parts := strings.Split(vidLoc.VolumeOrFileId, ",")
+ vidOnly := parts[0]
+ vid, err := strconv.ParseUint(vidOnly, 10, 32)
+ if err != nil {
+ glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
+ continue
+ }
+
+ var locations []Location
+ for _, masterLoc := range vidLoc.Locations {
+ loc := Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: int(masterLoc.GrpcPort),
+ DataCenter: masterLoc.DataCenter,
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ locations = append(locations, loc)
+ }
+
+ if len(locations) > 0 {
+ batchResult[vidOnly] = locations
}
}
+ return nil
+ })
+
+ if err != nil {
+ return batchResult, err
}
- return nil
+ return batchResult, nil
})
- return
+
+ if err != nil {
+ lookupErrors = append(lookupErrors, err)
+ }
+
+ // Merge singleflight batch results
+ if batchLocations, ok := sfResult.(map[string][]Location); ok {
+ for vid, locs := range batchLocations {
+ result[vid] = locs
+ }
+ }
+
+ // Check for volumes that still weren't found
+ for _, vidString := range needsLookup {
+ if _, found := result[vidString]; !found {
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
+ }
+ }
+
+ // Return aggregated errors using errors.Join to preserve error types
+ return result, errors.Join(lookupErrors...)
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {