diff options
| -rw-r--r-- | weed/server/filer_grpc_server.go | 81 | ||||
| -rw-r--r-- | weed/wdclient/masterclient.go | 71 |
2 files changed, 80 insertions, 72 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index c90cdb3e6..c9fdc37e8 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -5,8 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" - "strings" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -96,65 +94,17 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol LocationsMap: make(map[string]*filer_pb.Locations), } - // Collect volume IDs that are not in cache for batch lookup - var vidsToLookup []string - - for _, vidString := range req.VolumeIds { - vid, err := strconv.ParseUint(vidString, 10, 32) - if err != nil { - glog.V(1).InfofCtx(ctx, "Invalid volume id (must be uint32): %s", vidString) - return nil, err - } - - // Check cache first - locations, found := fs.filer.MasterClient.GetLocations(uint32(vid)) - if found && len(locations) > 0 { - // Found in cache - resp.LocationsMap[vidString] = &filer_pb.Locations{ - Locations: wdclientLocationsToPb(locations), - } - } else { - // Not in cache, need to query master - vidsToLookup = append(vidsToLookup, vidString) - } + // Use master client's lookup with fallback - it handles cache and master query + vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds) + if err != nil { + // Return partial results even on error + glog.V(0).InfofCtx(ctx, "failed to lookup some volumes: %v", err) } - // Query master for volumes not in cache - if len(vidsToLookup) > 0 { - glog.V(2).InfofCtx(ctx, "Looking up %d volumes from master: %v", len(vidsToLookup), vidsToLookup) - err := operation.WithMasterServerClient(false, fs.filer.GetMaster(ctx), fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - masterResp, err := masterClient.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: vidsToLookup, - }) - if err != nil { - return fmt.Errorf("master lookup failed: %v", err) - } - - // Process master response - for _, vidLoc := range masterResp.VolumeIdLocations { - if vidLoc.Error != "" { - glog.V(0).InfofCtx(ctx, "volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) - continue - } - - vidString := vidLoc.VolumeOrFileId - // Parse volume ID from response (could be "123" or "123,abc") - parts := strings.Split(vidString, ",") - vidOnly := parts[0] - - locs := masterLocationsToPb(vidLoc.Locations) - if len(locs) > 0 { - resp.LocationsMap[vidOnly] = &filer_pb.Locations{ - Locations: locs, - } - } - } - return nil - }) - - if err != nil { - glog.V(0).InfofCtx(ctx, "failed to lookup volumes from master: %v", err) - // Don't return error, return partial results + // Convert wdclient.Location to filer_pb.Location + for vidString, locations := range vidLocations { + resp.LocationsMap[vidString] = &filer_pb.Locations{ + Locations: wdclientLocationsToPb(locations), } } @@ -174,19 +124,6 @@ func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location { return locs } -func masterLocationsToPb(masterLocs []*master_pb.Location) []*filer_pb.Location { - var locs []*filer_pb.Location - for _, masterLoc := range masterLocs { - locs = append(locs, &filer_pb.Location{ - Url: masterLoc.Url, - PublicUrl: masterLoc.PublicUrl, - GrpcPort: masterLoc.GrpcPort, - DataCenter: masterLoc.DataCenter, - }) - } - return locs -} - func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { fid, err := needle.ParseFileIdFromString(fileId) if err != nil { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 11b58d861..9f3f3ba49 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math/rand" + "strconv" + "strings" "sync" "time" @@ -94,6 +96,75 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str return } +// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache +func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { + result := make(map[string][]Location) + var missingVids []string + + // Check cache first + for _, vidString := range volumeIds { + vid, err := strconv.ParseUint(vidString, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err) + } + locations, found := mc.GetLocations(uint32(vid)) + if found && len(locations) > 0 { + result[vidString] = locations + } else { + missingVids = append(missingVids, vidString) + } + } + + // Query master for missing volumes + if len(missingVids) > 0 { + glog.V(2).Infof("Looking up %d volumes from master: %v", len(missingVids), missingVids) + err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: missingVids, + }) + if err != nil { + return fmt.Errorf("master lookup failed: %v", err) + } + + for _, vidLoc := range resp.VolumeIdLocations { + if vidLoc.Error != "" { + glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue + } + + vidString := vidLoc.VolumeOrFileId + // Parse volume ID from response (could be "123" or "123,abc") + parts := strings.Split(vidString, ",") + vidOnly := parts[0] + vid, _ := strconv.ParseUint(vidOnly, 10, 32) + + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, + } + mc.vidMap.addLocation(uint32(vid), loc) + locations = append(locations, loc) + } + + if len(locations) > 0 { + result[vidOnly] = locations + } + } + return nil + }) + + if err != nil { + return result, err + } + } + + return result, nil +} + func (mc *MasterClient) getCurrentMaster() pb.ServerAddress { mc.currentMasterLock.RLock() defer mc.currentMasterLock.RUnlock() |
