aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/topology/topology_vacuum.go43
1 files changed, 37 insertions, 6 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 063fba5f7..d302afe8c 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,11 +2,12 @@ package topology
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@@ -119,10 +120,10 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
return isVacuumSuccess
}
-func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) bool {
+func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool {
isCommitSuccess := true
isReadOnly := false
- for _, dn := range locationlist.list {
+ for _, dn := range vacuumLocationList.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
@@ -140,8 +141,38 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
}
}
+
+ //we should check the status of all replicas
+ if len(locationList.list) > len(vacuumLocationList.list) {
+ for _, dn := range locationList.list {
+ isFound := false
+ for _, dnVaccum := range vacuumLocationList.list {
+ if dn.id == dnVaccum.id {
+ isFound = true
+ break
+ }
+ }
+ if !isFound {
+ err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
+ VolumeId: uint32(vid),
+ })
+ if resp != nil && resp.IsReadOnly {
+ isReadOnly = true
+ }
+ return err
+ })
+ if err != nil {
+ glog.Errorf("Error when checking volume %d status on %s: %v", vid, dn.Url(), err)
+ //we mark volume read-only, since the volume state is unknown
+ isReadOnly = true
+ }
+ }
+ }
+ }
+
if isCommitSuccess {
- for _, dn := range locationlist.list {
+ for _, dn := range vacuumLocationList.list {
vl.SetVolumeAvailable(dn, vid, isReadOnly)
}
}
@@ -226,11 +257,11 @@ func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayou
return
}
- glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
+ glog.V(1).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(
grpcDialOption, vid, locationList, garbageThreshold); needVacuum {
if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
- t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
+ t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList, locationList)
} else {
t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
}