aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-13 15:41:24 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-13 15:41:27 -0700
commitc90eb0da1f4cc479b85d9adec975c58f954e9dab (patch)
tree1e9c4f075e1a82d468cdd9d643f5260cc02013e8
parent025e586c9137da376b8a759075ce951cb195acf3 (diff)
downloadseaweedfs-c90eb0da1f4cc479b85d9adec975c58f954e9dab.tar.xz
seaweedfs-c90eb0da1f4cc479b85d9adec975c58f954e9dab.zip
volume: handling readonly volumes after compaction
ensure readonly volumes are not added as writable
-rw-r--r--weed/topology/topology_vacuum.go7
-rw-r--r--weed/topology/volume_layout.go27
2 files changed, 19 insertions, 15 deletions
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index ca626e973..9d964489d 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -5,9 +5,10 @@ import (
"sync/atomic"
"time"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -105,7 +106,9 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
} else {
glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
}
- if isCommitSuccess {
+ }
+ if isCommitSuccess {
+ for _, dn := range locationlist.list {
vl.SetVolumeAvailable(dn, vid)
}
}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 7633b28be..a0c66516d 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -51,6 +51,9 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ defer vl.ensureCorrectWritables(v)
+ defer vl.rememberOversizedVolume(v)
+
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
@@ -74,9 +77,6 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
- vl.rememberOversizedVolume(v)
- vl.ensureCorrectWritables(v)
-
}
func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
@@ -109,22 +109,13 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.addToWritable(v.Id)
+ vl.setVolumeWritable(v.Id)
}
} else {
vl.removeFromWritable(v.Id)
}
}
-func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
- for _, id := range vl.writables {
- if vid == id {
- return
- }
- }
- vl.writables = append(vl.writables, vid)
-}
-
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
return uint64(v.Size) >= vl.volumeSizeLimit
}
@@ -270,7 +261,17 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bo
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
+ vInfo, err := dn.GetVolumesById(v.Id)
+ if err != nil {
+ return false
+ }
+
vl.vid2location[vid].Set(dn)
+
+ if vInfo.ReadOnly {
+ return false
+ }
+
if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid)
}