aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
committershibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
commit7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch)
tree5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/topology
parent29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff)
parent53c3aad87528d57343afc5fdb3fb5107544af0fc (diff)
downloadseaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz
seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/data_node.go22
-rw-r--r--weed/topology/store_replicate.go5
-rw-r--r--weed/topology/topology.go29
-rw-r--r--weed/topology/topology_vacuum.go18
-rw-r--r--weed/topology/volume_layout.go148
-rw-r--r--weed/topology/volume_layout_test.go116
-rw-r--r--weed/topology/volume_location_list.go11
7 files changed, 302 insertions, 47 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index d18dd6af0..0a4df63d0 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -44,6 +44,10 @@ func (dn *DataNode) String() string {
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock()
defer dn.Unlock()
+ return dn.doAddOrUpdateVolume(v)
+}
+
+func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
@@ -71,11 +75,15 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
+
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
+
dn.Lock()
+ defer dn.Unlock()
+
for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
@@ -90,9 +98,8 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
}
}
}
- dn.Unlock()
for _, v := range actualVolumes {
- isNew, isChangedRO := dn.AddOrUpdateVolume(v)
+ isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
@@ -103,8 +110,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
return
}
-func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) {
+func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
dn.Lock()
+ defer dn.Unlock()
+
for _, v := range deletedVolumes {
delete(dn.volumes, v.Id)
dn.UpAdjustVolumeCountDelta(-1)
@@ -115,9 +124,8 @@ func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.Vol
dn.UpAdjustActiveVolumeCountDelta(-1)
}
}
- dn.Unlock()
- for _, v := range newlVolumes {
- dn.AddOrUpdateVolume(v)
+ for _, v := range newVolumes {
+ dn.doAddOrUpdateVolume(v)
}
return
}
@@ -199,6 +207,8 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
// GetVolumeIds returns the human readable volume ids limited to count of max 100.
func (dn *DataNode) GetVolumeIds() string {
+ dn.RLock()
+ defer dn.RUnlock()
ids := make([]int, 0, len(dn.volumes))
for k := range dn.volumes {
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index 236f8d773..faa16e2f6 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -14,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -80,7 +81,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
}
// volume server do not know about encryption
- _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt)
+ _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsCompressed(), string(n.Mime), pairMap, jwt)
return err
}); err != nil {
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
@@ -92,7 +93,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
- r *http.Request) (size uint32, err error) {
+ r *http.Request) (size types.Size, err error) {
//check JWT
jwt := security.GetJwt(r)
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 993f444a7..e217617e9 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"sync"
+ "time"
"github.com/chrislusf/raft"
@@ -65,31 +66,29 @@ func (t *Topology) IsLeader() bool {
if t.RaftServer.State() == raft.Leader {
return true
}
- if t.RaftServer.Leader() == "" {
- return true
- }
}
return false
}
func (t *Topology) Leader() (string, error) {
l := ""
- if t.RaftServer != nil {
- l = t.RaftServer.Leader()
- } else {
- return "", errors.New("Raft Server not ready yet!")
- }
-
- if l == "" {
- // We are a single node cluster, we are the leader
- return t.RaftServer.Name(), nil
+ for count := 0; count < 3; count++ {
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ } else {
+ return "", errors.New("Raft Server not ready yet!")
+ }
+ if l != "" {
+ break
+ } else {
+ time.Sleep(time.Duration(5+count) * time.Second)
+ }
}
-
return l, nil
}
func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) {
- //maybe an issue if lots of collections?
+ // maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
if list := c.(*Collection).Lookup(vid); list != nil {
@@ -222,7 +221,7 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
}
for _, v := range changedVolumes {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
- vl.ensureCorrectWritables(&v)
+ vl.EnsureCorrectWritables(&v)
}
return
}
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 789a01330..7bf55d131 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -42,13 +42,17 @@ func batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vl *VolumeLayout, vi
}(index, dn.Url(), vid)
}
vacuumLocationList := NewVolumeLocationList()
+
+ waitTimeout := time.NewTimer(30 * time.Minute)
+ defer waitTimeout.Stop()
+
for range locationlist.list {
select {
case index := <-ch:
if index != -1 {
vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
}
- case <-time.After(30 * time.Minute):
+ case <-waitTimeout.C:
return vacuumLocationList, false
}
}
@@ -81,11 +85,15 @@ func batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout,
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
+
+ waitTimeout := time.NewTimer(30 * time.Minute)
+ defer waitTimeout.Stop()
+
for range locationlist.list {
select {
case canCommit := <-ch:
isVacuumSuccess = isVacuumSuccess && canCommit
- case <-time.After(30 * time.Minute):
+ case <-waitTimeout.C:
return false
}
}
@@ -165,17 +173,17 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
volumeLayout.accessLock.RLock()
tmpMap := make(map[needle.VolumeId]*VolumeLocationList)
for vid, locationList := range volumeLayout.vid2location {
- tmpMap[vid] = locationList
+ tmpMap[vid] = locationList.Copy()
}
volumeLayout.accessLock.RUnlock()
for vid, locationList := range tmpMap {
volumeLayout.accessLock.RLock()
- isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
+ isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid)
volumeLayout.accessLock.RUnlock()
- if hasValue && isReadOnly {
+ if isReadOnly {
continue
}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 9e84fd2da..ffe36e95b 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -13,14 +13,100 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
+type copyState int
+
+const (
+ noCopies copyState = 0 + iota
+ insufficientCopies
+ enoughCopies
+)
+
+type volumeState string
+
+const (
+ readOnlyState volumeState = "ReadOnly"
+ oversizedState = "Oversized"
+)
+
+type stateIndicator func(copyState) bool
+
+func ExistCopies() stateIndicator {
+ return func(state copyState) bool { return state != noCopies }
+}
+
+func NoCopies() stateIndicator {
+ return func(state copyState) bool { return state == noCopies }
+}
+
+type volumesBinaryState struct {
+ rp *super_block.ReplicaPlacement
+ name volumeState // the name for volume state (eg. "Readonly", "Oversized")
+ indicator stateIndicator // indicate whether the volumes should be marked as `name`
+ copyMap map[needle.VolumeId]*VolumeLocationList
+}
+
+func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState {
+ return &volumesBinaryState{
+ rp: rp,
+ name: name,
+ indicator: indicator,
+ copyMap: make(map[needle.VolumeId]*VolumeLocationList),
+ }
+}
+
+func (v *volumesBinaryState) Dump() (res []uint32) {
+ for vid, list := range v.copyMap {
+ if v.indicator(v.copyState(list)) {
+ res = append(res, uint32(vid))
+ }
+ }
+ return
+}
+
+func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool {
+ list, _ := v.copyMap[vid]
+ return v.indicator(v.copyState(list))
+}
+
+func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) {
+ list, _ := v.copyMap[vid]
+ if list != nil {
+ list.Set(dn)
+ return
+ }
+ list = NewVolumeLocationList()
+ list.Set(dn)
+ v.copyMap[vid] = list
+}
+
+func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) {
+ list, _ := v.copyMap[vid]
+ if list != nil {
+ list.Remove(dn)
+ if list.Length() == 0 {
+ delete(v.copyMap, vid)
+ }
+ }
+}
+
+func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
+ if list == nil {
+ return noCopies
+ }
+ if list.Length() < v.rp.GetCopyCount() {
+ return insufficientCopies
+ }
+ return enoughCopies
+}
+
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *super_block.ReplicaPlacement
ttl *needle.TTL
vid2location map[needle.VolumeId]*VolumeLocationList
- writables []needle.VolumeId // transient array of writable volume id
- readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
- oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
+ writables []needle.VolumeId // transient array of writable volume id
+ readonlyVolumes *volumesBinaryState // readonly volumes
+ oversizedVolumes *volumesBinaryState // oversized volumes
volumeSizeLimit uint64
replicationAsMin bool
accessLock sync.RWMutex
@@ -38,8 +124,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
ttl: ttl,
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId),
- readonlyVolumes: make(map[needle.VolumeId]bool),
- oversizedVolumes: make(map[needle.VolumeId]bool),
+ readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
+ oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
volumeSizeLimit: volumeSizeLimit,
replicationAsMin: replicationAsMin,
}
@@ -53,8 +139,8 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- defer vl.ensureCorrectWritables(v)
- defer vl.rememberOversizedVolume(v)
+ defer vl.ensureCorrectWritables(v.Id)
+ defer vl.rememberOversizedVolume(v, dn)
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
@@ -66,24 +152,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if vInfo.ReadOnly {
glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
- vl.readonlyVolumes[v.Id] = true
+ vl.readonlyVolumes.Add(v.Id, dn)
return
} else {
- delete(vl.readonlyVolumes, v.Id)
+ vl.readonlyVolumes.Remove(v.Id, dn)
}
} else {
glog.V(1).Infof("vid %d removed from writable", v.Id)
vl.removeFromWritable(v.Id)
- delete(vl.readonlyVolumes, v.Id)
+ vl.readonlyVolumes.Remove(v.Id, dn)
return
}
}
}
-func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) {
if vl.isOversized(v) {
- vl.oversizedVolumes[v.Id] = true
+ vl.oversizedVolumes.Add(v.Id, dn)
+ } else {
+ vl.oversizedVolumes.Remove(v.Id, dn)
}
}
@@ -99,7 +187,9 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if location.Remove(dn) {
- vl.ensureCorrectWritables(v)
+ vl.readonlyVolumes.Remove(v.Id, dn)
+ vl.oversizedVolumes.Remove(v.Id, dn)
+ vl.ensureCorrectWritables(v.Id)
if location.Length() == 0 {
delete(vl.vid2location, v.Id)
@@ -108,16 +198,34 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
-func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
- if vl.enoughCopies(v.Id) && vl.isWritable(v) {
- if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.setVolumeWritable(v.Id)
+func (vl *VolumeLayout) EnsureCorrectWritables(v *storage.VolumeInfo) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.ensureCorrectWritables(v.Id)
+}
+
+func (vl *VolumeLayout) ensureCorrectWritables(vid needle.VolumeId) {
+ if vl.enoughCopies(vid) && vl.isAllWritable(vid) {
+ if !vl.oversizedVolumes.IsTrue(vid) {
+ vl.setVolumeWritable(vid)
}
} else {
- vl.removeFromWritable(v.Id)
+ vl.removeFromWritable(vid)
}
}
+func (vl *VolumeLayout) isAllWritable(vid needle.VolumeId) bool {
+ for _, dn := range vl.vid2location[vid].list {
+ if v, found := dn.volumes[vid]; found {
+ if v.ReadOnly {
+ return false
+ }
+ }
+ }
+ return true
+}
+
func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
return uint64(v.Size) >= vl.volumeSizeLimit
}
@@ -251,6 +359,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId)
if location, ok := vl.vid2location[vid]; ok {
if location.Remove(dn) {
+ vl.readonlyVolumes.Remove(vid, dn)
+ vl.oversizedVolumes.Remove(vid, dn)
if location.Length() < vl.rp.GetCopyCount() {
glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
return vl.removeFromWritable(vid)
@@ -315,7 +425,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
size, fileCount := vll.Stats(vid, freshThreshold)
ret.FileCount += uint64(fileCount)
ret.UsedSize += size
- if vl.readonlyVolumes[vid] {
+ if vl.readonlyVolumes.IsTrue(vid) {
ret.TotalSize += size
} else {
ret.TotalSize += vl.volumeSizeLimit
diff --git a/weed/topology/volume_layout_test.go b/weed/topology/volume_layout_test.go
new file mode 100644
index 000000000..e148d6107
--- /dev/null
+++ b/weed/topology/volume_layout_test.go
@@ -0,0 +1,116 @@
+package topology
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+func TestVolumesBinaryState(t *testing.T) {
+ vids := []needle.VolumeId{
+ needle.VolumeId(1),
+ needle.VolumeId(2),
+ needle.VolumeId(3),
+ needle.VolumeId(4),
+ needle.VolumeId(5),
+ }
+
+ dns := []*DataNode{
+ &DataNode{
+ Ip: "127.0.0.1",
+ Port: 8081,
+ },
+ &DataNode{
+ Ip: "127.0.0.1",
+ Port: 8082,
+ },
+ &DataNode{
+ Ip: "127.0.0.1",
+ Port: 8083,
+ },
+ }
+
+ rp, _ := super_block.NewReplicaPlacementFromString("002")
+
+ state_exist := NewVolumesBinaryState(readOnlyState, rp, ExistCopies())
+ state_exist.Add(vids[0], dns[0])
+ state_exist.Add(vids[0], dns[1])
+ state_exist.Add(vids[1], dns[2])
+ state_exist.Add(vids[2], dns[1])
+ state_exist.Add(vids[4], dns[1])
+ state_exist.Add(vids[4], dns[2])
+
+ state_no := NewVolumesBinaryState(readOnlyState, rp, NoCopies())
+ state_no.Add(vids[0], dns[0])
+ state_no.Add(vids[0], dns[1])
+ state_no.Add(vids[3], dns[1])
+
+ tests := []struct {
+ name string
+ state *volumesBinaryState
+ expectResult []bool
+ update func()
+ expectResultAfterUpdate []bool
+ }{
+ {
+ name: "mark true when exist copies",
+ state: state_exist,
+ expectResult: []bool{true, true, true, false, true},
+ update: func() {
+ state_exist.Remove(vids[0], dns[2])
+ state_exist.Remove(vids[1], dns[2])
+ state_exist.Remove(vids[3], dns[2])
+ state_exist.Remove(vids[4], dns[1])
+ state_exist.Remove(vids[4], dns[2])
+ },
+ expectResultAfterUpdate: []bool{true, false, true, false, false},
+ },
+ {
+ name: "mark true when inexist copies",
+ state: state_no,
+ expectResult: []bool{false, true, true, false, true},
+ update: func() {
+ state_no.Remove(vids[0], dns[2])
+ state_no.Remove(vids[1], dns[2])
+ state_no.Add(vids[2], dns[1])
+ state_no.Remove(vids[3], dns[1])
+ state_no.Remove(vids[4], dns[2])
+ },
+ expectResultAfterUpdate: []bool{false, true, false, true, true},
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ var result []bool
+ for index, _ := range vids {
+ result = append(result, test.state.IsTrue(vids[index]))
+ }
+ if len(result) != len(test.expectResult) {
+ t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n",
+ len(result), len(test.expectResult))
+ }
+ for index, val := range result {
+ if val != test.expectResult[index] {
+ t.Fatalf("result not matched, index %d, got %v, expect %v\n",
+ index, val, test.expectResult[index])
+ }
+ }
+ test.update()
+ var updateResult []bool
+ for index, _ := range vids {
+ updateResult = append(updateResult, test.state.IsTrue(vids[index]))
+ }
+ if len(updateResult) != len(test.expectResultAfterUpdate) {
+ t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n",
+ len(updateResult), len(test.expectResultAfterUpdate))
+ }
+ for index, val := range updateResult {
+ if val != test.expectResultAfterUpdate[index] {
+ t.Fatalf("update result not matched, index %d, got %v, expect %v\n",
+ index, val, test.expectResultAfterUpdate[index])
+ }
+ }
+ })
+ }
+}
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
index 8905c54b5..64c13ca52 100644
--- a/weed/topology/volume_location_list.go
+++ b/weed/topology/volume_location_list.go
@@ -18,12 +18,23 @@ func (dnll *VolumeLocationList) String() string {
return fmt.Sprintf("%v", dnll.list)
}
+func (dnll *VolumeLocationList) Copy() *VolumeLocationList {
+ list := make([]*DataNode, len(dnll.list))
+ copy(list, dnll.list)
+ return &VolumeLocationList{
+ list: list,
+ }
+}
+
func (dnll *VolumeLocationList) Head() *DataNode {
//mark first node as master volume
return dnll.list[0]
}
func (dnll *VolumeLocationList) Length() int {
+ if dnll == nil {
+ return 0
+ }
return len(dnll.list)
}