diff options
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 64 |
1 files changed, 55 insertions, 9 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 88e94115d..5d100bdda 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -50,20 +50,38 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + // Create EC context - prefer existing .vif config if present (for regeneration scenarios) + ecCtx := erasure_coding.NewDefaultECContext(req.Collection, needle.VolumeId(req.VolumeId)) + if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(baseFileName + ".vif"); found && volumeInfo.EcShardConfig != nil { + ds := int(volumeInfo.EcShardConfig.DataShards) + ps := int(volumeInfo.EcShardConfig.ParityShards) + + // Validate and use existing EC config + if ds > 0 && ps > 0 && ds+ps <= erasure_coding.MaxShardCount { + ecCtx.DataShards = ds + ecCtx.ParityShards = ps + glog.V(0).Infof("Using existing EC config for volume %d: %s", req.VolumeId, ecCtx.String()) + } else { + glog.Warningf("Invalid EC config in .vif for volume %d (data=%d, parity=%d), using defaults", req.VolumeId, ds, ps) + } + } else { + glog.V(0).Infof("Using default EC config for volume %d: %s", req.VolumeId, ecCtx.String()) + } + shouldCleanup := true defer func() { if !shouldCleanup { return } - for i := 0; i < erasure_coding.TotalShardsCount; i++ { - os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i)) + for i := 0; i < ecCtx.Total(); i++ { + os.Remove(baseFileName + ecCtx.ToExt(i)) } os.Remove(v.IndexFileName() + ".ecx") }() - // write .ec00 ~ .ec13 files - if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { - return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + // write .ec00 ~ .ec[TotalShards-1] files using context + if err := erasure_coding.WriteEcFilesWithContext(baseFileName, ecCtx); err != nil { + return nil, fmt.Errorf("WriteEcFilesWithContext %s: %v", baseFileName, err) } // write .ecx file @@ -84,6 +102,21 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ datSize, _, _ := v.FileStat() volumeInfo.DatFileSize = int64(datSize) + + // Validate EC configuration before saving to .vif + if ecCtx.DataShards <= 0 || ecCtx.ParityShards <= 0 || ecCtx.Total() > erasure_coding.MaxShardCount { + return nil, fmt.Errorf("invalid EC config before saving: data=%d, parity=%d, total=%d (max=%d)", + ecCtx.DataShards, ecCtx.ParityShards, ecCtx.Total(), erasure_coding.MaxShardCount) + } + + // Save EC configuration to VolumeInfo + volumeInfo.EcShardConfig = &volume_server_pb.EcShardConfig{ + DataShards: uint32(ecCtx.DataShards), + ParityShards: uint32(ecCtx.ParityShards), + } + glog.V(1).Infof("Saving EC config to .vif for volume %d: %d+%d (total: %d)", + req.VolumeId, ecCtx.DataShards, ecCtx.ParityShards, ecCtx.Total()) + if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil { return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err) } @@ -442,9 +475,10 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ glog.V(0).Infof("VolumeEcShardsToVolume: %v", req) - // collect .ec00 ~ .ec09 files - shardFileNames := make([]string, erasure_coding.DataShardsCount) - v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames) + // Collect all EC shards (NewEcVolume will load EC config from .vif into v.ECContext) + // Use MaxShardCount (32) to support custom EC ratios up to 32 total shards + tempShards := make([]string, erasure_coding.MaxShardCount) + v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), tempShards) if !found { return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) } @@ -453,7 +487,19 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } - for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ { + // Use EC context (already loaded from .vif) to determine data shard count + dataShards := v.ECContext.DataShards + + // Defensive validation to prevent panics from corrupted ECContext + if dataShards <= 0 || dataShards > erasure_coding.MaxShardCount { + return nil, fmt.Errorf("invalid data shard count %d for volume %d (must be 1..%d)", dataShards, req.VolumeId, erasure_coding.MaxShardCount) + } + + shardFileNames := tempShards[:dataShards] + glog.V(1).Infof("Using EC config from volume %d: %d data shards", req.VolumeId, dataShards) + + // Verify all data shards are present + for shardId := 0; shardId < dataShards; shardId++ { if shardFileNames[shardId] == "" { return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId) } |
