aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-03-19 04:48:13 -0700
committerChris Lu <chris.lu@gmail.com>2014-03-19 04:48:13 -0700
commit0563773944aa96c1bb87a2e840f00c0e34a8175f (patch)
tree723610067336d0367e53ad01e7bd163640b32601 /go/topology
parent463589da016e9a56cee0300b50219947dbd2fc64 (diff)
downloadseaweedfs-0563773944aa96c1bb87a2e840f00c0e34a8175f.tar.xz
seaweedfs-0563773944aa96c1bb87a2e840f00c0e34a8175f.zip
switch to ReadAt() for thread-safe read
fix bugs during volume compaction
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/topology.go6
-rw-r--r--go/topology/volume_layout.go28
-rw-r--r--go/topology/volume_location_list.go10
3 files changed, 25 insertions, 19 deletions
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 055d273cd..d5af60cd8 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -129,8 +129,8 @@ func (t *Topology) DeleteCollection(collectionName string) {
delete(t.collectionMap, collectionName)
}
-func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
+func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
@@ -144,7 +144,7 @@ func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo,
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
- t.RegisterVolumeLayout(&v, dn)
+ t.RegisterVolumeLayout(v, dn)
}
}
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 1a35faa5c..61121a498 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -33,15 +33,22 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
- if vl.vid2location[v.Id].Add(dn) {
- if len(vl.vid2location[v.Id].list) == v.ReplicaPlacement.GetCopyCount() {
- if vl.isWritable(v) {
- vl.writables = append(vl.writables, v.Id)
- } else {
- vl.removeFromWritable(v.Id)
- }
+ vl.vid2location[v.Id].Set(dn)
+ glog.V(3).Infoln("volume", v.Id, "added to dn", dn, "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
+ if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
+ vl.AddToWritable(v.Id)
+ } else {
+ vl.removeFromWritable(v.Id)
+ }
+}
+
+func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
+ for _, id := range vl.writables {
+ if vid == id {
+ return
}
}
+ vl.writables = append(vl.writables, vid)
}
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
@@ -156,10 +163,9 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- if vl.vid2location[vid].Add(dn) {
- if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
- return vl.setVolumeWritable(vid)
- }
+ vl.vid2location[vid].Set(dn)
+ if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
+ return vl.setVolumeWritable(vid)
}
return false
}
diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go
index 507a240b5..176f469b9 100644
--- a/go/topology/volume_location_list.go
+++ b/go/topology/volume_location_list.go
@@ -18,14 +18,14 @@ func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}
-func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
- for _, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- return false
+func (dnll *VolumeLocationList) Set(loc *DataNode) {
+ for i := 0; i < len(dnll.list); i++ {
+ if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {
+ dnll.list[i] = loc
+ return
}
}
dnll.list = append(dnll.list, loc)
- return true
}
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {