diff options
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 69 |
1 files changed, 39 insertions, 30 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 55e0261c8..186c6eafc 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -8,7 +8,6 @@ import ( "math" "os" "path" - "path/filepath" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -44,7 +43,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ if v == nil { return nil, fmt.Errorf("volume %d not found", req.VolumeId) } - baseFileName := v.FileName() + baseFileName := v.DataFileName() if v.Collection != req.Collection { return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) @@ -56,8 +55,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ } // write .ecx file - if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil { - return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err) + if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil { + return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err) } // write .vif files @@ -78,17 +77,18 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s var rebuiltShardIds []uint32 for _, location := range vs.store.Locations { - if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { + if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) { // write .ec00 ~ .ec13 files - baseFileName = path.Join(location.Directory, baseFileName) - if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) + dataBaseFileName := path.Join(location.Directory, baseFileName) + if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err) } else { rebuiltShardIds = generatedShardIds } - if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil { - return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err) + indexBaseFileName := path.Join(location.IdxDirectory, baseFileName) + if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err) } break @@ -110,13 +110,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv return nil, fmt.Errorf("no space left") } - baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) + dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) + indexBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { return err } } @@ -124,7 +125,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcxFile { // copy ecx file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil { return err } return nil @@ -132,14 +133,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcjFile { // copy ecj file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil { return err } } if req.CopyVifFile { // copy vif file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil { return err } } @@ -157,17 +158,19 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // the shard should not be mounted before calling this. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { - baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds) found := false + var indexBaseFilename, dataBaseFilename string for _, location := range vs.store.Locations { - if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) { + if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) { found = true - baseFilename = path.Join(location.Directory, baseFilename) + indexBaseFilename = path.Join(location.IdxDirectory, bName) + dataBaseFilename = path.Join(location.Directory, bName) for _, shardId := range req.ShardIds { - os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) + os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId))) } break } @@ -182,12 +185,18 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se hasIdxFile := false existingShardCount := 0 - bName := filepath.Base(baseFilename) for _, location := range vs.store.Locations { fileInfos, err := ioutil.ReadDir(location.Directory) if err != nil { continue } + if location.IdxDirectory != location.Directory { + idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory) + if err != nil { + continue + } + fileInfos = append(fileInfos, idxFileInfos...) + } for _, fileInfo := range fileInfos { if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" { hasEcxFile = true @@ -204,14 +213,14 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se } if hasEcxFile && existingShardCount == 0 { - if err := os.Remove(baseFilename + ".ecx"); err != nil { + if err := os.Remove(indexBaseFilename + ".ecx"); err != nil { return nil, err } - os.Remove(baseFilename + ".ecj") + os.Remove(indexBaseFilename + ".ecj") } if !hasIdxFile { // .vif is used for ec volumes and normal volumes - os.Remove(baseFilename + ".vif") + os.Remove(dataBaseFilename + ".vif") } return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil @@ -365,26 +374,26 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ if !found { return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) } - baseFileName := v.FileName() if v.Collection != req.Collection { return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName() // calculate .dat file size - datFileSize, err := erasure_coding.FindDatFileSize(baseFileName) + datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName) if err != nil { - return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err) + return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err) } // write .dat file from .ec00 ~ .ec09 files - if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil { - return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err) } // write .idx file from .ecx and .ecj files - if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil { - return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err) + if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil { + return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err) } return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil |
