1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
package topology
import (
"context"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
)
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold float64) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumdId: uint32(vid),
})
if err != nil {
ch <- false
return err
}
isNeeded := resp.GarbageRatio > garbageThreshold
ch <- isNeeded
return nil
})
if err != nil {
glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
}
}(index, dn.Url(), vid)
}
isCheckSuccess := true
for _ = range locationlist.list {
select {
case canVacuum := <-ch:
isCheckSuccess = isCheckSuccess && canVacuum
case <-time.After(30 * time.Minute):
isCheckSuccess = false
break
}
}
return isCheckSuccess
}
func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
vl.removeFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
err := operation.WithVolumeServerClient(url, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
VolumdId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)
ch <- false
} else {
glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
ch <- true
}
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
for _ = range locationlist.list {
select {
case canCommit := <-ch:
isVacuumSuccess = isVacuumSuccess && canCommit
case <-time.After(30 * time.Minute):
isVacuumSuccess = false
break
}
}
return isVacuumSuccess
}
func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumdId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
isCommitSuccess = false
} else {
glog.V(0).Infof("Complete Commiting vacuum %d on %s", vid, dn.Url())
}
if isCommitSuccess {
vl.SetVolumeAvailable(dn, vid)
}
}
return isCommitSuccess
}
func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(dn.Url(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumdId: uint32(vid),
})
return err
})
if err != nil {
glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, dn.Url(), err)
} else {
glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, dn.Url())
}
}
}
func (t *Topology) Vacuum(garbageThreshold float64, preallocate int64) int {
glog.V(0).Infof("Start vacuum on demand with threshold: %f", garbageThreshold)
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
volumeLayout.accessLock.RLock()
isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
volumeLayout.accessLock.RUnlock()
if hasValue && isReadOnly {
continue
}
glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
}
}
}
}
}
}
return 0
}
type VacuumVolumeResult struct {
Result bool
Error string
}
|