aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_erasure_coding.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-12-30 13:05:50 +0800
committerGitHub <noreply@github.com>2019-12-30 13:05:50 +0800
commit70da715d8d917527291b35fb069fac077d17b868 (patch)
treeb89bad02094cc7131bc2c9f64df13e15f9de9914 /weed/server/volume_grpc_erasure_coding.go
parent93a7df500ffeed766e395907e860b1733040ff23 (diff)
parent09043c8e5a3b43add589344d28d4f57e90c83f70 (diff)
downloadseaweedfs-70da715d8d917527291b35fb069fac077d17b868.tar.xz
seaweedfs-70da715d8d917527291b35fb069fac077d17b868.zip
Merge pull request #4 from chrislusf/master
Syncing to the original repository
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go97
1 files changed, 77 insertions, 20 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 8140a06f6..4bca9948e 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,10 +8,12 @@ import (
"math"
"os"
"path"
+ "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -24,7 +26,7 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
-1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
2. client ask master for possible servers to hold the ec files, at least 4 servers
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
@@ -33,7 +35,7 @@ Steps to apply erasure coding to .dat .idx files
*/
-// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
+// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -47,19 +49,24 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
}
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ // write .vif files
+ if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
-// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
@@ -68,7 +75,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
baseFileName = path.Join(location.Directory, baseFileName)
if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
@@ -103,23 +110,32 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
- if !req.CopyEcxFile {
+ if req.CopyEcxFile {
+
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ return err
+ }
return nil
}
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
- return err
+ if req.CopyEcjFile {
+ // copy ecj file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ return err
+ }
}
- // copy ecj file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
- return err
+ if req.CopyVifFile {
+ // copy vif file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ return err
+ }
}
return nil
@@ -137,6 +153,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+
found := false
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
@@ -153,21 +171,22 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, nil
}
- // check whether to delete the ecx file also
+ // check whether to delete the .ecx and .ecj file also
hasEcxFile := false
existingShardCount := 0
+ bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
for _, fileInfo := range fileInfos {
- if fileInfo.Name() == baseFilename+".ecx" {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
continue
}
- if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
existingShardCount++
}
}
@@ -252,9 +271,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
startOffset, bytesToRead := req.Offset, req.Size
for bytesToRead > 0 {
- bytesread, err := ecShard.ReadAt(buffer, startOffset)
+ // min of bytesToRead and bufSize
+ bufferSize := bufSize
+ if bufferSize > bytesToRead {
+ bufferSize = bytesToRead
+ }
+ bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
- // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+ // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
if bytesread > 0 {
if int64(bytesread) > bytesToRead {
@@ -268,6 +292,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
return err
}
+ startOffset += int64(bytesread)
bytesToRead -= int64(bytesread)
}
@@ -311,3 +336,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
return resp, nil
}
+
+// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+
+ v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // calculate .dat file size
+ datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ }
+
+ // write .dat file from .ec00 ~ .ec09 files
+ if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .idx file from .ecx and .ecj files
+ if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
+}