aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server.go')
-rw-r--r--weed/server/filer_grpc_server.go85
1 files changed, 72 insertions, 13 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a18c55bb1..4606f8f6f 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strconv"
+ "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -94,27 +95,85 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
LocationsMap: make(map[string]*filer_pb.Locations),
}
+ // Collect volume IDs that are not in cache for batch lookup
+ var vidsToLookup []string
+ vidMap := make(map[string]uint32)
+
for _, vidString := range req.VolumeIds {
vid, err := strconv.Atoi(vidString)
if err != nil {
- glog.V(1).InfofCtx(ctx, "Unknown volume id %d", vid)
+ glog.V(1).InfofCtx(ctx, "Unknown volume id %s", vidString)
return nil, err
}
- var locs []*filer_pb.Location
+ vidMap[vidString] = uint32(vid)
+
+ // Check cache first
locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
- if !found {
- continue
+ if found && len(locations) > 0 {
+ // Found in cache
+ var locs []*filer_pb.Location
+ for _, loc := range locations {
+ locs = append(locs, &filer_pb.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ GrpcPort: uint32(loc.GrpcPort),
+ DataCenter: loc.DataCenter,
+ })
+ }
+ resp.LocationsMap[vidString] = &filer_pb.Locations{
+ Locations: locs,
+ }
+ } else {
+ // Not in cache, need to query master
+ vidsToLookup = append(vidsToLookup, vidString)
}
- for _, loc := range locations {
- locs = append(locs, &filer_pb.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- GrpcPort: uint32(loc.GrpcPort),
- DataCenter: loc.DataCenter,
+ }
+
+ // Query master for volumes not in cache
+ if len(vidsToLookup) > 0 {
+ glog.V(2).InfofCtx(ctx, "Looking up %d volumes from master: %v", len(vidsToLookup), vidsToLookup)
+ err := operation.WithMasterServerClient(false, fs.filer.GetMaster(ctx), fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ masterResp, err := masterClient.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: vidsToLookup,
})
- }
- resp.LocationsMap[vidString] = &filer_pb.Locations{
- Locations: locs,
+ if err != nil {
+ return fmt.Errorf("master lookup failed: %v", err)
+ }
+
+ // Process master response
+ for _, vidLoc := range masterResp.VolumeIdLocations {
+ if vidLoc.Error != "" {
+ glog.V(0).InfofCtx(ctx, "volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
+ continue
+ }
+
+ vidString := vidLoc.VolumeOrFileId
+ // Parse volume ID from response (could be "123" or "123,abc")
+ parts := strings.Split(vidString, ",")
+ vidOnly := parts[0]
+
+ var locs []*filer_pb.Location
+ for _, masterLoc := range vidLoc.Locations {
+ locs = append(locs, &filer_pb.Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: masterLoc.GrpcPort,
+ DataCenter: masterLoc.DataCenter,
+ })
+ }
+
+ if len(locs) > 0 {
+ resp.LocationsMap[vidOnly] = &filer_pb.Locations{
+ Locations: locs,
+ }
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ glog.V(0).InfofCtx(ctx, "failed to lookup volumes from master: %v", err)
+ // Don't return error, return partial results
}
}