diff options
| author | chrislu <chris.lu@gmail.com> | 2023-01-03 22:05:26 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-01-03 22:05:26 -0800 |
| commit | 7bdae5172e58e0b43d90f3f40df577857d004e40 (patch) | |
| tree | 6536b976371a694e15b29723613bb73904f1aa25 | |
| parent | 8aec430df720742bc7dce4bb30cb1757939d984b (diff) | |
| download | seaweedfs-7bdae5172e58e0b43d90f3f40df577857d004e40.tar.xz seaweedfs-7bdae5172e58e0b43d90f3f40df577857d004e40.zip | |
batch delete EC needles
fix https://github.com/seaweedfs/seaweedfs/issues/4107
| -rw-r--r-- | weed/server/volume_grpc_batch_delete.go | 64 |
1 files changed, 46 insertions, 18 deletions
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 25780ec75..8deb96a80 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -28,6 +28,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B n := new(needle.Needle) volumeId, _ := needle.NewVolumeId(vid) + ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId) if req.SkipCookieCheck { n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie) if err != nil { @@ -40,13 +41,24 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } else { n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusNotFound, - Error: err.Error(), - }) - continue + if !isEcVolume { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusNotFound, + Error: err.Error(), + }) + continue + } + } else { + if _, err := vs.store.ReadEcShardNeedle(volumeId, n, nil); err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusNotFound, + Error: err.Error(), + }) + continue + } } if n.Cookie != cookie { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ @@ -68,18 +80,34 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } n.LastModified = now - if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusInternalServerError, - Error: err.Error()}, - ) + if !isEcVolume { + if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusInternalServerError, + Error: err.Error()}, + ) + } else { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusAccepted, + Size: uint32(size)}, + ) + } } else { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusAccepted, - Size: uint32(size)}, - ) + if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, n.Cookie); err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusInternalServerError, + Error: err.Error()}, + ) + } else { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusAccepted, + Size: uint32(size)}, + ) + } } } |
