aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/topology_vacuum.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/topology_vacuum.go')
-rw-r--r--weed/topology/topology_vacuum.go158
1 files changed, 158 insertions, 0 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
new file mode 100644
index 000000000..8cf8dfbeb
--- /dev/null
+++ b/weed/topology/topology_vacuum.go
@@ -0,0 +1,158 @@
+package topology
+
+import (
+ "encoding/json"
+ "errors"
+ "net/url"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
+ if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
+ //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
+ ch <- ret
+ }
+ }(index, dn.Url(), vid)
+ }
+ isCheckSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case canVacuum := <-ch:
+ isCheckSuccess = isCheckSuccess && canVacuum
+ case <-time.After(30 * time.Minute):
+ isCheckSuccess = false
+ break
+ }
+ }
+ return isCheckSuccess
+}
+func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ vl.removeFromWritable(vid)
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
+ if e := vacuumVolume_Compact(url, vid); e != nil {
+ glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
+ ch <- true
+ }
+ }(index, dn.Url(), vid)
+ }
+ isVacuumSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case _ = <-ch:
+ case <-time.After(30 * time.Minute):
+ isVacuumSuccess = false
+ break
+ }
+ }
+ return isVacuumSuccess
+}
+func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ isCommitSuccess := true
+ for _, dn := range locationlist.list {
+ glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
+ if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
+ glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
+ isCommitSuccess = false
+ } else {
+ glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
+ }
+ if isCommitSuccess {
+ vl.SetVolumeAvailable(dn, vid)
+ }
+ }
+ return isCommitSuccess
+}
+func (t *Topology) Vacuum(garbageThreshold string) int {
+ glog.V(0).Infoln("Start vacuum on demand")
+ for _, col := range t.collectionMap.Items() {
+ c := col.(*Collection)
+ glog.V(0).Infoln("vacuum on collection:", c.Name)
+ for _, vl := range c.storageType2VolumeLayout.Items() {
+ if vl != nil {
+ volumeLayout := vl.(*VolumeLayout)
+ for vid, locationlist := range volumeLayout.vid2location {
+ glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
+ if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
+ batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
+ }
+ }
+ }
+ }
+ }
+ }
+ return 0
+}
+
+type VacuumVolumeResult struct {
+ Result bool
+ Error string
+}
+
+func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("garbageThreshold", garbageThreshold)
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
+ if err != nil {
+ glog.V(0).Infoln("parameters:", values)
+ return err, false
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err, false
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error), false
+ }
+ return nil, ret.Result
+}
+func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
+func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}