aboutsummaryrefslogtreecommitdiff
path: root/src/weed/topology/topology_compact.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/weed/topology/topology_compact.go')
-rw-r--r--src/weed/topology/topology_compact.go150
1 files changed, 150 insertions, 0 deletions
diff --git a/src/weed/topology/topology_compact.go b/src/weed/topology/topology_compact.go
new file mode 100644
index 000000000..c2b85fe63
--- /dev/null
+++ b/src/weed/topology/topology_compact.go
@@ -0,0 +1,150 @@
+package topology
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+ "weed/storage"
+ "weed/util"
+ "time"
+)
+
+func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url())
+ if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
+ //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
+ ch <- ret
+ }
+ }(index, dn.Url(), vid)
+ }
+ isCheckSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case canVacuum := <-ch:
+ isCheckSuccess = isCheckSuccess && canVacuum
+ case <-time.After(30 * time.Minute):
+ isCheckSuccess = false
+ break
+ }
+ }
+ return isCheckSuccess
+}
+func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ vl.removeFromWritable(vid)
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ fmt.Println(index, "Start vacuuming", vid, "on", dn.Url())
+ if e := vacuumVolume_Compact(url, vid); e != nil {
+ fmt.Println(index, "Error when vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ fmt.Println(index, "Complete vacuuming", vid, "on", url)
+ ch <- true
+ }
+ }(index, dn.Url(), vid)
+ }
+ isVacuumSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case _ = <-ch:
+ case <-time.After(30 * time.Minute):
+ isVacuumSuccess = false
+ break
+ }
+ }
+ return isVacuumSuccess
+}
+func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ isCommitSuccess := true
+ for _, dn := range locationlist.list {
+ fmt.Println("Start Commiting vacuum", vid, "on", dn.Url())
+ if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
+ fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e)
+ isCommitSuccess = false
+ } else {
+ fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url())
+ }
+ }
+ if isCommitSuccess {
+ vl.setVolumeWritable(vid)
+ }
+ return isCommitSuccess
+}
+func (t *Topology) Vacuum(garbageThreshold string) int {
+ for _, vl := range t.replicaType2VolumeLayout {
+ if vl != nil {
+ for vid, locationlist := range vl.vid2location {
+ if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(vl, vid, locationlist) {
+ batchVacuumVolumeCommit(vl, vid, locationlist)
+ }
+ }
+ }
+ }
+ }
+ return 0
+}
+
+type VacuumVolumeResult struct {
+ Result bool
+ Error string
+}
+
+func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("garbageThreshold", garbageThreshold)
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
+ if err != nil {
+ fmt.Println("parameters:", values)
+ return err, false
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err, false
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error), false
+ }
+ return nil, ret.Result
+}
+func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
+func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}