diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/data_node.go | 20 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 3 | ||||
| -rw-r--r-- | weed/topology/topology.go | 27 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 6 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 118 | ||||
| -rw-r--r-- | weed/topology/volume_layout_test.go | 116 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 8 |
7 files changed, 261 insertions, 37 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index efdf5285b..0a4df63d0 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -44,6 +44,10 @@ func (dn *DataNode) String() string { func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() + return dn.doAddOrUpdateVolume(v) +} + +func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { if oldV, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v dn.UpAdjustVolumeCountDelta(1) @@ -71,11 +75,15 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } + dn.Lock() + defer dn.Unlock() + for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) @@ -90,9 +98,8 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume } } } - dn.Unlock() for _, v := range actualVolumes { - isNew, isChangedRO := dn.AddOrUpdateVolume(v) + isNew, isChangedRO := dn.doAddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } @@ -103,8 +110,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume return } -func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { +func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) { dn.Lock() + defer dn.Unlock() + for _, v := range deletedVolumes { delete(dn.volumes, v.Id) dn.UpAdjustVolumeCountDelta(-1) @@ -115,9 +124,8 @@ func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.Vol dn.UpAdjustActiveVolumeCountDelta(-1) } } - dn.Unlock() - for _, v := range newlVolumes { - dn.AddOrUpdateVolume(v) + for _, v := range newVolumes { + dn.doAddOrUpdateVolume(v) } return } diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index 481e72fe0..faa16e2f6 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -14,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -92,7 +93,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.Volume func ReplicatedDelete(masterNode string, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, - r *http.Request) (size uint32, err error) { + r *http.Request) (size types.Size, err error) { //check JWT jwt := security.GetJwt(r) diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 993f444a7..a11a1bac6 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "sync" + "time" "github.com/chrislusf/raft" @@ -65,31 +66,29 @@ func (t *Topology) IsLeader() bool { if t.RaftServer.State() == raft.Leader { return true } - if t.RaftServer.Leader() == "" { - return true - } } return false } func (t *Topology) Leader() (string, error) { l := "" - if t.RaftServer != nil { - l = t.RaftServer.Leader() - } else { - return "", errors.New("Raft Server not ready yet!") - } - - if l == "" { - // We are a single node cluster, we are the leader - return t.RaftServer.Name(), nil + for count := 0; count < 3; count++ { + if t.RaftServer != nil { + l = t.RaftServer.Leader() + } else { + return "", errors.New("Raft Server not ready yet!") + } + if l != "" { + break + } else { + time.Sleep(time.Duration(5+count) * time.Second) + } } - return l, nil } func (t *Topology) Lookup(collection string, vid needle.VolumeId) (dataNodes []*DataNode) { - //maybe an issue if lots of collections? + // maybe an issue if lots of collections? if collection == "" { for _, c := range t.collectionMap.Items() { if list := c.(*Collection).Lookup(vid); list != nil { diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 789a01330..4f3d69627 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -165,17 +165,17 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL volumeLayout.accessLock.RLock() tmpMap := make(map[needle.VolumeId]*VolumeLocationList) for vid, locationList := range volumeLayout.vid2location { - tmpMap[vid] = locationList + tmpMap[vid] = locationList.Copy() } volumeLayout.accessLock.RUnlock() for vid, locationList := range tmpMap { volumeLayout.accessLock.RLock() - isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid] + isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid) volumeLayout.accessLock.RUnlock() - if hasValue && isReadOnly { + if isReadOnly { continue } diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 9e84fd2da..e7659e0eb 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -13,14 +13,100 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) +type copyState int + +const ( + noCopies copyState = 0 + iota + insufficientCopies + enoughCopies +) + +type volumeState string + +const ( + readOnlyState volumeState = "ReadOnly" + oversizedState = "Oversized" +) + +type stateIndicator func(copyState) bool + +func ExistCopies() stateIndicator { + return func(state copyState) bool { return state != noCopies } +} + +func NoCopies() stateIndicator { + return func(state copyState) bool { return state == noCopies } +} + +type volumesBinaryState struct { + rp *super_block.ReplicaPlacement + name volumeState // the name for volume state (eg. "Readonly", "Oversized") + indicator stateIndicator // indicate whether the volumes should be marked as `name` + copyMap map[needle.VolumeId]*VolumeLocationList +} + +func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState { + return &volumesBinaryState{ + rp: rp, + name: name, + indicator: indicator, + copyMap: make(map[needle.VolumeId]*VolumeLocationList), + } +} + +func (v *volumesBinaryState) Dump() (res []uint32) { + for vid, list := range v.copyMap { + if v.indicator(v.copyState(list)) { + res = append(res, uint32(vid)) + } + } + return +} + +func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool { + list, _ := v.copyMap[vid] + return v.indicator(v.copyState(list)) +} + +func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Set(dn) + return + } + list = NewVolumeLocationList() + list.Set(dn) + v.copyMap[vid] = list +} + +func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) { + list, _ := v.copyMap[vid] + if list != nil { + list.Remove(dn) + if list.Length() == 0 { + delete(v.copyMap, vid) + } + } +} + +func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState { + if list == nil { + return noCopies + } + if list.Length() < v.rp.GetCopyCount() { + return insufficientCopies + } + return enoughCopies +} + // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *super_block.ReplicaPlacement ttl *needle.TTL vid2location map[needle.VolumeId]*VolumeLocationList - writables []needle.VolumeId // transient array of writable volume id - readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes - oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes + writables []needle.VolumeId // transient array of writable volume id + readonlyVolumes *volumesBinaryState // readonly volumes + oversizedVolumes *volumesBinaryState // oversized volumes volumeSizeLimit uint64 replicationAsMin bool accessLock sync.RWMutex @@ -38,8 +124,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi ttl: ttl, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - readonlyVolumes: make(map[needle.VolumeId]bool), - oversizedVolumes: make(map[needle.VolumeId]bool), + readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), + oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), volumeSizeLimit: volumeSizeLimit, replicationAsMin: replicationAsMin, } @@ -54,7 +140,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { defer vl.accessLock.Unlock() defer vl.ensureCorrectWritables(v) - defer vl.rememberOversizedVolume(v) + defer vl.rememberOversizedVolume(v, dn) if _, ok := vl.vid2location[v.Id]; !ok { vl.vid2location[v.Id] = NewVolumeLocationList() @@ -66,24 +152,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if vInfo.ReadOnly { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - vl.readonlyVolumes[v.Id] = true + vl.readonlyVolumes.Add(v.Id, dn) return } else { - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) } } else { glog.V(1).Infof("vid %d removed from writable", v.Id) vl.removeFromWritable(v.Id) - delete(vl.readonlyVolumes, v.Id) + vl.readonlyVolumes.Remove(v.Id, dn) return } } } -func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) { +func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) { if vl.isOversized(v) { - vl.oversizedVolumes[v.Id] = true + vl.oversizedVolumes.Add(v.Id, dn) + } else { + vl.oversizedVolumes.Remove(v.Id, dn) } } @@ -99,6 +187,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { if location.Remove(dn) { + vl.readonlyVolumes.Remove(v.Id, dn) + vl.oversizedVolumes.Remove(v.Id, dn) vl.ensureCorrectWritables(v) if location.Length() == 0 { @@ -110,7 +200,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { if vl.enoughCopies(v.Id) && vl.isWritable(v) { - if _, ok := vl.oversizedVolumes[v.Id]; !ok { + if !vl.oversizedVolumes.IsTrue(v.Id) { vl.setVolumeWritable(v.Id) } } else { @@ -251,6 +341,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId) if location, ok := vl.vid2location[vid]; ok { if location.Remove(dn) { + vl.readonlyVolumes.Remove(vid, dn) + vl.oversizedVolumes.Remove(vid, dn) if location.Length() < vl.rp.GetCopyCount() { glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) return vl.removeFromWritable(vid) @@ -315,7 +407,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats { size, fileCount := vll.Stats(vid, freshThreshold) ret.FileCount += uint64(fileCount) ret.UsedSize += size - if vl.readonlyVolumes[vid] { + if vl.readonlyVolumes.IsTrue(vid) { ret.TotalSize += size } else { ret.TotalSize += vl.volumeSizeLimit diff --git a/weed/topology/volume_layout_test.go b/weed/topology/volume_layout_test.go new file mode 100644 index 000000000..e148d6107 --- /dev/null +++ b/weed/topology/volume_layout_test.go @@ -0,0 +1,116 @@ +package topology + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +func TestVolumesBinaryState(t *testing.T) { + vids := []needle.VolumeId{ + needle.VolumeId(1), + needle.VolumeId(2), + needle.VolumeId(3), + needle.VolumeId(4), + needle.VolumeId(5), + } + + dns := []*DataNode{ + &DataNode{ + Ip: "127.0.0.1", + Port: 8081, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8082, + }, + &DataNode{ + Ip: "127.0.0.1", + Port: 8083, + }, + } + + rp, _ := super_block.NewReplicaPlacementFromString("002") + + state_exist := NewVolumesBinaryState(readOnlyState, rp, ExistCopies()) + state_exist.Add(vids[0], dns[0]) + state_exist.Add(vids[0], dns[1]) + state_exist.Add(vids[1], dns[2]) + state_exist.Add(vids[2], dns[1]) + state_exist.Add(vids[4], dns[1]) + state_exist.Add(vids[4], dns[2]) + + state_no := NewVolumesBinaryState(readOnlyState, rp, NoCopies()) + state_no.Add(vids[0], dns[0]) + state_no.Add(vids[0], dns[1]) + state_no.Add(vids[3], dns[1]) + + tests := []struct { + name string + state *volumesBinaryState + expectResult []bool + update func() + expectResultAfterUpdate []bool + }{ + { + name: "mark true when exist copies", + state: state_exist, + expectResult: []bool{true, true, true, false, true}, + update: func() { + state_exist.Remove(vids[0], dns[2]) + state_exist.Remove(vids[1], dns[2]) + state_exist.Remove(vids[3], dns[2]) + state_exist.Remove(vids[4], dns[1]) + state_exist.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{true, false, true, false, false}, + }, + { + name: "mark true when inexist copies", + state: state_no, + expectResult: []bool{false, true, true, false, true}, + update: func() { + state_no.Remove(vids[0], dns[2]) + state_no.Remove(vids[1], dns[2]) + state_no.Add(vids[2], dns[1]) + state_no.Remove(vids[3], dns[1]) + state_no.Remove(vids[4], dns[2]) + }, + expectResultAfterUpdate: []bool{false, true, false, true, true}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var result []bool + for index, _ := range vids { + result = append(result, test.state.IsTrue(vids[index])) + } + if len(result) != len(test.expectResult) { + t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n", + len(result), len(test.expectResult)) + } + for index, val := range result { + if val != test.expectResult[index] { + t.Fatalf("result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResult[index]) + } + } + test.update() + var updateResult []bool + for index, _ := range vids { + updateResult = append(updateResult, test.state.IsTrue(vids[index])) + } + if len(updateResult) != len(test.expectResultAfterUpdate) { + t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n", + len(updateResult), len(test.expectResultAfterUpdate)) + } + for index, val := range updateResult { + if val != test.expectResultAfterUpdate[index] { + t.Fatalf("update result not matched, index %d, got %v, expect %v\n", + index, val, test.expectResultAfterUpdate[index]) + } + } + }) + } +} diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go index 8905c54b5..6acd70f2f 100644 --- a/weed/topology/volume_location_list.go +++ b/weed/topology/volume_location_list.go @@ -18,6 +18,14 @@ func (dnll *VolumeLocationList) String() string { return fmt.Sprintf("%v", dnll.list) } +func (dnll *VolumeLocationList) Copy() *VolumeLocationList { + list := make([]*DataNode, len(dnll.list)) + copy(list, dnll.list) + return &VolumeLocationList{ + list: list, + } +} + func (dnll *VolumeLocationList) Head() *DataNode { //mark first node as master volume return dnll.list[0] |
