aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-04-01 16:48:58 -0700
committerGitHub <noreply@github.com>2020-04-01 16:48:58 -0700
commitc446438ca5360b8e479e77dd0da32a80bd5644d6 (patch)
tree9f984e23068124d2e37f465728fe82426f50c9aa
parentd61bb60450e8fabc5c5961874fe2e69afa0200ec (diff)
parenteae3f27c8021ef1903419de43c68d2dc62a09953 (diff)
downloadseaweedfs-c446438ca5360b8e479e77dd0da32a80bd5644d6.tar.xz
seaweedfs-c446438ca5360b8e479e77dd0da32a80bd5644d6.zip
Merge pull request #1255 from levenlabs/ignore
Added treat_replication_as_minimums master toml option
-rw-r--r--weed/command/scaffold.go8
-rw-r--r--weed/server/master_server.go5
-rw-r--r--weed/topology/collection.go11
-rw-r--r--weed/topology/topology.go8
-rw-r--r--weed/topology/topology_test.go4
-rw-r--r--weed/topology/volume_growth_test.go2
-rw-r--r--weed/topology/volume_layout.go14
7 files changed, 39 insertions, 13 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 9f119a638..e391c23ea 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -394,5 +394,13 @@ copy_2 = 6 # create 2 x 6 = 12 actual volumes
copy_3 = 3 # create 3 x 3 = 9 actual volumes
copy_other = 1 # create n x 1 = n actual volumes
+# configuration flags for replication
+[master.replication]
+# any replication counts should be considered minimums. If you specify 010 and
+# have 3 different racks, that's still considered writable. Writes will still
+# try to replicate to all available volumes. You should only use this option
+# if you are doing your own replication or periodic sync of volumes.
+treat_replication_as_minimums = false
+
`
)
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index a9ae6b888..497990f29 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -77,6 +77,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
+ v.SetDefault("master.replication.treat_replication_as_minimums", false)
+ replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
+
var preallocateSize int64
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
@@ -96,7 +99,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
- ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
diff --git a/weed/topology/collection.go b/weed/topology/collection.go
index 7a611d904..5b410d1eb 100644
--- a/weed/topology/collection.go
+++ b/weed/topology/collection.go
@@ -11,11 +11,16 @@ import (
type Collection struct {
Name string
volumeSizeLimit uint64
+ replicationAsMin bool
storageType2VolumeLayout *util.ConcurrentReadMap
}
-func NewCollection(name string, volumeSizeLimit uint64) *Collection {
- c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
+func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection {
+ c := &Collection{
+ Name: name,
+ volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
+ }
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
@@ -30,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, t
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
- return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ return NewVolumeLayout(rp, ttl, c.volumeSizeLimit, c.replicationAsMin)
})
return vl.(*VolumeLayout)
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index fbf998707..c24cab9d6 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -27,7 +27,8 @@ type Topology struct {
pulse int64
- volumeSizeLimit uint64
+ volumeSizeLimit uint64
+ replicationAsMin bool
Sequence sequence.Sequencer
@@ -38,7 +39,7 @@ type Topology struct {
RaftServer raft.Server
}
-func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) *Topology {
+func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int, replicationAsMin bool) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
@@ -48,6 +49,7 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.ecShardMap = make(map[needle.VolumeId]*EcShardLocations)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
+ t.replicationAsMin = replicationAsMin
t.Sequence = seq
@@ -138,7 +140,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
- return NewCollection(collectionName, t.volumeSizeLimit)
+ return NewCollection(collectionName, t.volumeSizeLimit, t.replicationAsMin)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index e7676ccf7..2fe381ca2 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -23,7 +23,7 @@ func TestRemoveDataCenter(t *testing.T) {
}
func TestHandlingVolumeServerHeartbeat(t *testing.T) {
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
@@ -140,7 +140,7 @@ func assert(t *testing.T, message string, actual, expected int) {
func TestAddRemoveVolume(t *testing.T) {
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
index 6ff5be0eb..bc9083fd2 100644
--- a/weed/topology/volume_growth_test.go
+++ b/weed/topology/volume_growth_test.go
@@ -81,7 +81,7 @@ func setup(topologyLayout string) *Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 908bbb9e9..9e84fd2da 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -22,6 +22,7 @@ type VolumeLayout struct {
readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
+ replicationAsMin bool
accessLock sync.RWMutex
}
@@ -31,7 +32,7 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
@@ -40,6 +41,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
readonlyVolumes: make(map[needle.VolumeId]bool),
oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
}
}
@@ -107,7 +109,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
+ if vl.enoughCopies(v.Id) && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok {
vl.setVolumeWritable(v.Id)
}
@@ -272,12 +274,18 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is
return false
}
- if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
+ if vl.enoughCopies(vid) {
return vl.setVolumeWritable(vid)
}
return false
}
+func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
+ locations := vl.vid2location[vid].Length()
+ desired := vl.rp.GetCopyCount()
+ return locations == desired || (vl.replicationAsMin && locations > desired)
+}
+
func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()