aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/store_ec_delete.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-20 00:55:30 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-20 00:55:30 -0700
commite63317fb08fd74a0728fe9463a3d80e2cbda2473 (patch)
treed20394d3b0cd9b5f2e2e351c71459b0eff3d9a70 /weed/storage/store_ec_delete.go
parent4cea8aefd035223d32b79593342b37aef1989a92 (diff)
downloadseaweedfs-e63317fb08fd74a0728fe9463a3d80e2cbda2473.tar.xz
seaweedfs-e63317fb08fd74a0728fe9463a3d80e2cbda2473.zip
ec deletion code complete, not tested yet
Diffstat (limited to 'weed/storage/store_ec_delete.go')
-rw-r--r--weed/storage/store_ec_delete.go105
1 files changed, 105 insertions, 0 deletions
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
new file mode 100644
index 000000000..e027d2887
--- /dev/null
+++ b/weed/storage/store_ec_delete.go
@@ -0,0 +1,105 @@
+package storage
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
+
+ count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n)
+
+ if err != nil {
+ return 0, err
+ }
+
+ if cookie != n.Cookie {
+ return 0, fmt.Errorf("unexpected cookie %x", cookie)
+ }
+
+ if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil {
+ return 0, err
+ }
+
+ return int64(count), nil
+
+}
+
+func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
+
+ _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
+
+ if len(intervals) == 0 {
+ return erasure_coding.NotFoundError
+ }
+
+ shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
+
+ hasDeletionSuccess := false
+ err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId)
+ if err == nil {
+ hasDeletionSuccess = true
+ }
+
+ for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
+ if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil {
+ hasDeletionSuccess = true
+ }
+ }
+
+ if hasDeletionSuccess {
+ return nil
+ }
+
+ return err
+
+}
+
+func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
+
+ ecVolume.ShardLocationsLock.RLock()
+ sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
+ ecVolume.ShardLocationsLock.RUnlock()
+
+ if !hasShardLocations {
+ return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
+ }
+
+ for _, sourceDataNode := range sourceDataNodes {
+ glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
+ err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
+ if err != nil {
+ return err
+ }
+ glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err)
+ }
+
+ return nil
+
+}
+
+func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
+
+ return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ // copy data slice
+ _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{
+ VolumeId: uint32(vid),
+ Collection: collection,
+ FileKey: uint64(needleId),
+ Version: uint32(version),
+ })
+ if err != nil {
+ return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err)
+ }
+ return nil
+ })
+
+}