diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_encode.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_s3_clean_uploads.go | 92 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_move.go | 2 |
3 files changed, 94 insertions, 2 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index edacf22c6..634cb11e2 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -274,7 +274,7 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri quietSeconds := int64(quietPeriod / time.Second) nowUnixSeconds := time.Now().Unix() - fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds) + fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds) vidMap := make(map[uint32]bool) eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go new file mode 100644 index 000000000..1fe13d981 --- /dev/null +++ b/weed/shell/command_s3_clean_uploads.go @@ -0,0 +1,92 @@ +package shell + +import ( + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "math" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func init() { + Commands = append(Commands, &commandS3CleanUploads{}) +} + +type commandS3CleanUploads struct { +} + +func (c *commandS3CleanUploads) Name() string { + return "s3.clean.uploads" +} + +func (c *commandS3CleanUploads) Help() string { + return `clean up stale multipart uploads + + Example: + s3.clean.uploads -replication 001 + +` +} + +func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"") + if err = bucketCommand.Parse(args); err != nil { + return nil + } + + var filerBucketsPath string + filerBucketsPath, err = readFilerBucketsPath(commandEnv) + if err != nil { + return fmt.Errorf("read buckets: %v", err) + } + + var buckets []string + err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { + buckets = append(buckets, entry.Name) + return nil + }, "", false, math.MaxUint32) + if err != nil { + return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err) + } + + for _, bucket:= range buckets { + c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo) + } + + return err + +} + +func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error { + uploadsDir := filerBucketsPath+"/"+bucket+"/.uploads" + var staleUploads []string + now := time.Now() + err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error { + ctime := time.Unix(entry.Attributes.Crtime, 0) + if ctime.Add(timeAgo).Before(now) { + staleUploads = append(staleUploads, entry.Name) + } + return nil + }, "", false, math.MaxUint32) + if err != nil { + return fmt.Errorf("list uploads under %v: %v", uploadsDir, err) + } + + for _, staleUpload:= range staleUploads { + deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true",commandEnv.option.FilerHost, commandEnv.option.FilerPort,uploadsDir, staleUpload) + fmt.Fprintf(writer, "purge %s\n", deleteUrl) + + err = util.Delete(deleteUrl, "") + if err != nil { + return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err) + } + } + + return nil + +}
\ No newline at end of file diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go index a15efd6b3..f7fa94031 100644 --- a/weed/shell/command_volume_tier_move.go +++ b/weed/shell/command_volume_tier_move.go @@ -102,7 +102,7 @@ func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection strin keepDataNodesSorted(allLocations, toDiskType) fn := capacityByFreeVolumeCount(toDiskType) for _, dst := range allLocations { - if fn(dst.dataNode) > 0 { + if fn(dst.dataNode) > 0 && !hasFoundTarget { // ask the volume server to replicate the volume if isOneOf(dst.dataNode.Id, locations) { continue |
