aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_erasure_coding.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go64
1 files changed, 55 insertions, 9 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 88e94115d..5d100bdda 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -50,20 +50,38 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ // Create EC context - prefer existing .vif config if present (for regeneration scenarios)
+ ecCtx := erasure_coding.NewDefaultECContext(req.Collection, needle.VolumeId(req.VolumeId))
+ if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(baseFileName + ".vif"); found && volumeInfo.EcShardConfig != nil {
+ ds := int(volumeInfo.EcShardConfig.DataShards)
+ ps := int(volumeInfo.EcShardConfig.ParityShards)
+
+ // Validate and use existing EC config
+ if ds > 0 && ps > 0 && ds+ps <= erasure_coding.MaxShardCount {
+ ecCtx.DataShards = ds
+ ecCtx.ParityShards = ps
+ glog.V(0).Infof("Using existing EC config for volume %d: %s", req.VolumeId, ecCtx.String())
+ } else {
+ glog.Warningf("Invalid EC config in .vif for volume %d (data=%d, parity=%d), using defaults", req.VolumeId, ds, ps)
+ }
+ } else {
+ glog.V(0).Infof("Using default EC config for volume %d: %s", req.VolumeId, ecCtx.String())
+ }
+
shouldCleanup := true
defer func() {
if !shouldCleanup {
return
}
- for i := 0; i < erasure_coding.TotalShardsCount; i++ {
- os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
+ for i := 0; i < ecCtx.Total(); i++ {
+ os.Remove(baseFileName + ecCtx.ToExt(i))
}
os.Remove(v.IndexFileName() + ".ecx")
}()
- // write .ec00 ~ .ec13 files
- if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ // write .ec00 ~ .ec[TotalShards-1] files using context
+ if err := erasure_coding.WriteEcFilesWithContext(baseFileName, ecCtx); err != nil {
+ return nil, fmt.Errorf("WriteEcFilesWithContext %s: %v", baseFileName, err)
}
// write .ecx file
@@ -84,6 +102,21 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
datSize, _, _ := v.FileStat()
volumeInfo.DatFileSize = int64(datSize)
+
+ // Validate EC configuration before saving to .vif
+ if ecCtx.DataShards <= 0 || ecCtx.ParityShards <= 0 || ecCtx.Total() > erasure_coding.MaxShardCount {
+ return nil, fmt.Errorf("invalid EC config before saving: data=%d, parity=%d, total=%d (max=%d)",
+ ecCtx.DataShards, ecCtx.ParityShards, ecCtx.Total(), erasure_coding.MaxShardCount)
+ }
+
+ // Save EC configuration to VolumeInfo
+ volumeInfo.EcShardConfig = &volume_server_pb.EcShardConfig{
+ DataShards: uint32(ecCtx.DataShards),
+ ParityShards: uint32(ecCtx.ParityShards),
+ }
+ glog.V(1).Infof("Saving EC config to .vif for volume %d: %d+%d (total: %d)",
+ req.VolumeId, ecCtx.DataShards, ecCtx.ParityShards, ecCtx.Total())
+
if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil {
return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err)
}
@@ -442,9 +475,10 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
- // collect .ec00 ~ .ec09 files
- shardFileNames := make([]string, erasure_coding.DataShardsCount)
- v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
+ // Collect all EC shards (NewEcVolume will load EC config from .vif into v.ECContext)
+ // Use MaxShardCount (32) to support custom EC ratios up to 32 total shards
+ tempShards := make([]string, erasure_coding.MaxShardCount)
+ v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), tempShards)
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
@@ -453,7 +487,19 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
- for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
+ // Use EC context (already loaded from .vif) to determine data shard count
+ dataShards := v.ECContext.DataShards
+
+ // Defensive validation to prevent panics from corrupted ECContext
+ if dataShards <= 0 || dataShards > erasure_coding.MaxShardCount {
+ return nil, fmt.Errorf("invalid data shard count %d for volume %d (must be 1..%d)", dataShards, req.VolumeId, erasure_coding.MaxShardCount)
+ }
+
+ shardFileNames := tempShards[:dataShards]
+ glog.V(1).Infof("Using EC config from volume %d: %d data shards", req.VolumeId, dataShards)
+
+ // Verify all data shards are present
+ for shardId := 0; shardId < dataShards; shardId++ {
if shardFileNames[shardId] == "" {
return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId)
}