aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/filer_grpc_server.go81
-rw-r--r--weed/wdclient/masterclient.go71
2 files changed, 80 insertions, 72 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index c90cdb3e6..c9fdc37e8 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -5,8 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
- "strconv"
- "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -96,65 +94,17 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
LocationsMap: make(map[string]*filer_pb.Locations),
}
- // Collect volume IDs that are not in cache for batch lookup
- var vidsToLookup []string
-
- for _, vidString := range req.VolumeIds {
- vid, err := strconv.ParseUint(vidString, 10, 32)
- if err != nil {
- glog.V(1).InfofCtx(ctx, "Invalid volume id (must be uint32): %s", vidString)
- return nil, err
- }
-
- // Check cache first
- locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
- if found && len(locations) > 0 {
- // Found in cache
- resp.LocationsMap[vidString] = &filer_pb.Locations{
- Locations: wdclientLocationsToPb(locations),
- }
- } else {
- // Not in cache, need to query master
- vidsToLookup = append(vidsToLookup, vidString)
- }
+ // Use master client's lookup with fallback - it handles cache and master query
+ vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
+ if err != nil {
+ // Return partial results even on error
+ glog.V(0).InfofCtx(ctx, "failed to lookup some volumes: %v", err)
}
- // Query master for volumes not in cache
- if len(vidsToLookup) > 0 {
- glog.V(2).InfofCtx(ctx, "Looking up %d volumes from master: %v", len(vidsToLookup), vidsToLookup)
- err := operation.WithMasterServerClient(false, fs.filer.GetMaster(ctx), fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- masterResp, err := masterClient.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
- VolumeOrFileIds: vidsToLookup,
- })
- if err != nil {
- return fmt.Errorf("master lookup failed: %v", err)
- }
-
- // Process master response
- for _, vidLoc := range masterResp.VolumeIdLocations {
- if vidLoc.Error != "" {
- glog.V(0).InfofCtx(ctx, "volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
- continue
- }
-
- vidString := vidLoc.VolumeOrFileId
- // Parse volume ID from response (could be "123" or "123,abc")
- parts := strings.Split(vidString, ",")
- vidOnly := parts[0]
-
- locs := masterLocationsToPb(vidLoc.Locations)
- if len(locs) > 0 {
- resp.LocationsMap[vidOnly] = &filer_pb.Locations{
- Locations: locs,
- }
- }
- }
- return nil
- })
-
- if err != nil {
- glog.V(0).InfofCtx(ctx, "failed to lookup volumes from master: %v", err)
- // Don't return error, return partial results
+ // Convert wdclient.Location to filer_pb.Location
+ for vidString, locations := range vidLocations {
+ resp.LocationsMap[vidString] = &filer_pb.Locations{
+ Locations: wdclientLocationsToPb(locations),
}
}
@@ -174,19 +124,6 @@ func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
return locs
}
-func masterLocationsToPb(masterLocs []*master_pb.Location) []*filer_pb.Location {
- var locs []*filer_pb.Location
- for _, masterLoc := range masterLocs {
- locs = append(locs, &filer_pb.Location{
- Url: masterLoc.Url,
- PublicUrl: masterLoc.PublicUrl,
- GrpcPort: masterLoc.GrpcPort,
- DataCenter: masterLoc.DataCenter,
- })
- }
- return locs
-}
-
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 11b58d861..9f3f3ba49 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"math/rand"
+ "strconv"
+ "strings"
"sync"
"time"
@@ -94,6 +96,75 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str
return
}
+// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
+func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ result := make(map[string][]Location)
+ var missingVids []string
+
+ // Check cache first
+ for _, vidString := range volumeIds {
+ vid, err := strconv.ParseUint(vidString, 10, 32)
+ if err != nil {
+ return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
+ }
+ locations, found := mc.GetLocations(uint32(vid))
+ if found && len(locations) > 0 {
+ result[vidString] = locations
+ } else {
+ missingVids = append(missingVids, vidString)
+ }
+ }
+
+ // Query master for missing volumes
+ if len(missingVids) > 0 {
+ glog.V(2).Infof("Looking up %d volumes from master: %v", len(missingVids), missingVids)
+ err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: missingVids,
+ })
+ if err != nil {
+ return fmt.Errorf("master lookup failed: %v", err)
+ }
+
+ for _, vidLoc := range resp.VolumeIdLocations {
+ if vidLoc.Error != "" {
+ glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
+ continue
+ }
+
+ vidString := vidLoc.VolumeOrFileId
+ // Parse volume ID from response (could be "123" or "123,abc")
+ parts := strings.Split(vidString, ",")
+ vidOnly := parts[0]
+ vid, _ := strconv.ParseUint(vidOnly, 10, 32)
+
+ var locations []Location
+ for _, masterLoc := range vidLoc.Locations {
+ loc := Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: int(masterLoc.GrpcPort),
+ DataCenter: masterLoc.DataCenter,
+ }
+ mc.vidMap.addLocation(uint32(vid), loc)
+ locations = append(locations, loc)
+ }
+
+ if len(locations) > 0 {
+ result[vidOnly] = locations
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ return result, err
+ }
+ }
+
+ return result, nil
+}
+
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()