aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/filer_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/filer_util.go')
-rw-r--r--weed/s3api/filer_util.go55
1 files changed, 55 insertions, 0 deletions
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 91c34f0eb..3c11b032c 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -139,6 +139,61 @@ func (s3a *S3ApiServer) rm(ctx context.Context, parentDirectoryPath string, entr
}
+func (s3a *S3ApiServer) streamRemove(ctx context.Context, quiet bool,
+ fn func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool),
+ respFn func(err string)) error {
+
+ return s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ stream, err := client.StreamDeleteEntries(ctx)
+ if err != nil {
+ glog.V(0).Infof("stream delete entry: %v", err)
+ return fmt.Errorf("stream delete entry: %v", err)
+ }
+
+ waitc := make(chan struct{})
+ go func() {
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ // read done.
+ close(waitc)
+ return
+ }
+ if err != nil {
+ glog.V(0).Infof("streamRemove: %v", err)
+ return
+ }
+ respFn(resp.Error)
+ }
+ }()
+
+ for {
+ finished, parentDirectoryPath, entryName, isDeleteData, isRecursive := fn()
+ if finished {
+ break
+ }
+ err = stream.Send(&filer_pb.DeleteEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ IsDeleteData: isDeleteData,
+ IsRecursive: isRecursive,
+ IgnoreRecursiveError: quiet,
+ })
+ if err != nil {
+ glog.V(0).Infof("streamRemove: %v", err)
+ break
+ }
+
+ }
+ stream.CloseSend()
+ <-waitc
+ return err
+
+ })
+
+}
+
func (s3a *S3ApiServer) exists(ctx context.Context, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
err = s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {