aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_erasure_coding.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-11-27 03:17:10 -0800
committerChris Lu <chris.lu@gmail.com>2020-11-27 03:17:10 -0800
commit6d30b21b10260c8c0f2c9f665a3d907494566092 (patch)
treeb2509c87ee0cb5eb373d8c3fabc4fd1ac53c7af5 /weed/server/volume_grpc_erasure_coding.go
parent3c229eb677a9aedd4e59f511f10aa4ba552f821b (diff)
downloadseaweedfs-6d30b21b10260c8c0f2c9f665a3d907494566092.tar.xz
seaweedfs-6d30b21b10260c8c0f2c9f665a3d907494566092.zip
volume: add "-dir.idx" option for separate index storage
fix https://github.com/chrislusf/seaweedfs/issues/1265
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go69
1 files changed, 39 insertions, 30 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 55e0261c8..186c6eafc 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,7 +8,6 @@ import (
"math"
"os"
"path"
- "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -44,7 +43,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
if v == nil {
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
}
- baseFileName := v.FileName()
+ baseFileName := v.DataFileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
@@ -56,8 +55,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
- return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
}
// write .vif files
@@ -78,17 +77,18 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
var rebuiltShardIds []uint32
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
+ if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
// write .ec00 ~ .ec13 files
- baseFileName = path.Join(location.Directory, baseFileName)
- if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
+ dataBaseFileName := path.Join(location.Directory, baseFileName)
+ if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
} else {
rebuiltShardIds = generatedShardIds
}
- if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
+ indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
+ if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
}
break
@@ -110,13 +110,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
return nil, fmt.Errorf("no space left")
}
- baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+ dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+ indexBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
@@ -124,7 +125,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcxFile {
// copy ecx file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
return err
}
return nil
@@ -132,14 +133,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcjFile {
// copy ecj file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
return err
}
}
if req.CopyVifFile {
// copy vif file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
}
@@ -157,17 +158,19 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// the shard should not be mounted before calling this.
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
- baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
found := false
+ var indexBaseFilename, dataBaseFilename string
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
+ if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
found = true
- baseFilename = path.Join(location.Directory, baseFilename)
+ indexBaseFilename = path.Join(location.IdxDirectory, bName)
+ dataBaseFilename = path.Join(location.Directory, bName)
for _, shardId := range req.ShardIds {
- os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
+ os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
}
break
}
@@ -182,12 +185,18 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
hasIdxFile := false
existingShardCount := 0
- bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
+ if location.IdxDirectory != location.Directory {
+ idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory)
+ if err != nil {
+ continue
+ }
+ fileInfos = append(fileInfos, idxFileInfos...)
+ }
for _, fileInfo := range fileInfos {
if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
@@ -204,14 +213,14 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
}
if hasEcxFile && existingShardCount == 0 {
- if err := os.Remove(baseFilename + ".ecx"); err != nil {
+ if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
return nil, err
}
- os.Remove(baseFilename + ".ecj")
+ os.Remove(indexBaseFilename + ".ecj")
}
if !hasIdxFile {
// .vif is used for ec volumes and normal volumes
- os.Remove(baseFilename + ".vif")
+ os.Remove(dataBaseFilename + ".vif")
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
@@ -365,26 +374,26 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
- baseFileName := v.FileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
// calculate .dat file size
- datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
if err != nil {
- return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
}
// write .dat file from .ec00 ~ .ec09 files
- if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
- return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
}
// write .idx file from .ecx and .ecj files
- if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
}
return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil