aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_ec_encode.go2
-rw-r--r--weed/shell/command_s3_clean_uploads.go92
-rw-r--r--weed/shell/command_volume_tier_move.go2
3 files changed, 94 insertions, 2 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index edacf22c6..634cb11e2 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -274,7 +274,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
- fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
+ fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds)
vidMap := make(map[uint32]bool)
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
new file mode 100644
index 000000000..1fe13d981
--- /dev/null
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -0,0 +1,92 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func init() {
+ Commands = append(Commands, &commandS3CleanUploads{})
+}
+
+type commandS3CleanUploads struct {
+}
+
+func (c *commandS3CleanUploads) Name() string {
+ return "s3.clean.uploads"
+}
+
+func (c *commandS3CleanUploads) Help() string {
+ return `clean up stale multipart uploads
+
+ Example:
+ s3.clean.uploads -replication 001
+
+`
+}
+
+func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
+ if err = bucketCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ var filerBucketsPath string
+ filerBucketsPath, err = readFilerBucketsPath(commandEnv)
+ if err != nil {
+ return fmt.Errorf("read buckets: %v", err)
+ }
+
+ var buckets []string
+ err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
+ buckets = append(buckets, entry.Name)
+ return nil
+ }, "", false, math.MaxUint32)
+ if err != nil {
+ return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
+ }
+
+ for _, bucket:= range buckets {
+ c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo)
+ }
+
+ return err
+
+}
+
+func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error {
+ uploadsDir := filerBucketsPath+"/"+bucket+"/.uploads"
+ var staleUploads []string
+ now := time.Now()
+ err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ ctime := time.Unix(entry.Attributes.Crtime, 0)
+ if ctime.Add(timeAgo).Before(now) {
+ staleUploads = append(staleUploads, entry.Name)
+ }
+ return nil
+ }, "", false, math.MaxUint32)
+ if err != nil {
+ return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
+ }
+
+ for _, staleUpload:= range staleUploads {
+ deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true",commandEnv.option.FilerHost, commandEnv.option.FilerPort,uploadsDir, staleUpload)
+ fmt.Fprintf(writer, "purge %s\n", deleteUrl)
+
+ err = util.Delete(deleteUrl, "")
+ if err != nil {
+ return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
+ }
+ }
+
+ return nil
+
+} \ No newline at end of file
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index a15efd6b3..f7fa94031 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -102,7 +102,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin
keepDataNodesSorted(allLocations, toDiskType)
fn := capacityByFreeVolumeCount(toDiskType)
for _, dst := range allLocations {
- if fn(dst.dataNode) > 0 {
+ if fn(dst.dataNode) > 0 && !hasFoundTarget {
// ask the volume server to replicate the volume
if isOneOf(dst.dataNode.Id, locations) {
continue