aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-01-03 22:05:26 -0800
committerchrislu <chris.lu@gmail.com>2023-01-03 22:05:26 -0800
commit7bdae5172e58e0b43d90f3f40df577857d004e40 (patch)
tree6536b976371a694e15b29723613bb73904f1aa25
parent8aec430df720742bc7dce4bb30cb1757939d984b (diff)
downloadseaweedfs-7bdae5172e58e0b43d90f3f40df577857d004e40.tar.xz
seaweedfs-7bdae5172e58e0b43d90f3f40df577857d004e40.zip
batch delete EC needles
fix https://github.com/seaweedfs/seaweedfs/issues/4107
-rw-r--r--weed/server/volume_grpc_batch_delete.go64
1 files changed, 46 insertions, 18 deletions
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 25780ec75..8deb96a80 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -28,6 +28,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
n := new(needle.Needle)
volumeId, _ := needle.NewVolumeId(vid)
+ ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId)
if req.SkipCookieCheck {
n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie)
if err != nil {
@@ -40,13 +41,24 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else {
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusNotFound,
- Error: err.Error(),
- })
- continue
+ if !isEcVolume {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusNotFound,
+ Error: err.Error(),
+ })
+ continue
+ }
+ } else {
+ if _, err := vs.store.ReadEcShardNeedle(volumeId, n, nil); err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusNotFound,
+ Error: err.Error(),
+ })
+ continue
+ }
}
if n.Cookie != cookie {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
@@ -68,18 +80,34 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
}
n.LastModified = now
- if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusInternalServerError,
- Error: err.Error()},
- )
+ if !isEcVolume {
+ if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusInternalServerError,
+ Error: err.Error()},
+ )
+ } else {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusAccepted,
+ Size: uint32(size)},
+ )
+ }
} else {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusAccepted,
- Size: uint32(size)},
- )
+ if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, n.Cookie); err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusInternalServerError,
+ Error: err.Error()},
+ )
+ } else {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusAccepted,
+ Size: uint32(size)},
+ )
+ }
}
}