aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_erasure_coding.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go64
1 files changed, 51 insertions, 13 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 23cc29e0d..5981c5efe 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -141,20 +141,31 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
var location *storage.DiskLocation
- if req.CopyEcxFile {
- location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
- return location.DiskType == types.HardDriveType
- })
+
+ // Use disk_id if provided (disk-aware storage)
+ if req.DiskId > 0 || (req.DiskId == 0 && len(vs.store.Locations) > 0) {
+ // Validate disk ID is within bounds
+ if int(req.DiskId) >= len(vs.store.Locations) {
+ return nil, fmt.Errorf("invalid disk_id %d: only have %d disks", req.DiskId, len(vs.store.Locations))
+ }
+
+ // Use the specific disk location
+ location = vs.store.Locations[req.DiskId]
+ glog.V(1).Infof("Using disk %d for EC shard copy: %s", req.DiskId, location.Directory)
} else {
- location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
- //(location.FindEcVolume) This method is error, will cause location is nil, redundant judgment
- // _, found := location.FindEcVolume(needle.VolumeId(req.VolumeId))
- // return found
- return true
- })
- }
- if location == nil {
- return nil, fmt.Errorf("no space left")
+ // Fallback to old behavior for backward compatibility
+ if req.CopyEcxFile {
+ location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
+ return location.DiskType == types.HardDriveType
+ })
+ } else {
+ location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
+ return true
+ })
+ }
+ if location == nil {
+ return nil, fmt.Errorf("no space left")
+ }
}
dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
@@ -467,3 +478,30 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
}
+
+func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsInfo: volume %d", req.VolumeId)
+
+ var ecShardInfos []*volume_server_pb.EcShardInfo
+
+ // Find the EC volume
+ for _, location := range vs.store.Locations {
+ if v, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
+ // Get shard details from the EC volume
+ shardDetails := v.ShardDetails()
+ for _, shardDetail := range shardDetails {
+ ecShardInfo := &volume_server_pb.EcShardInfo{
+ ShardId: uint32(shardDetail.ShardId),
+ Size: shardDetail.Size,
+ Collection: v.Collection,
+ }
+ ecShardInfos = append(ecShardInfos, ecShardInfo)
+ }
+ break
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsInfoResponse{
+ EcShardInfos: ecShardInfos,
+ }, nil
+}