aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/allocate_volume.go7
-rw-r--r--go/topology/collection.go23
-rw-r--r--go/topology/data_node.go6
-rw-r--r--go/topology/topology.go21
-rw-r--r--go/topology/topology_event_handling.go6
-rw-r--r--go/topology/topology_map.go2
-rw-r--r--go/topology/topology_vacuum.go2
-rw-r--r--go/topology/volume_growth.go12
-rw-r--r--go/topology/volume_layout.go13
9 files changed, 61 insertions, 31 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
index 77b4ac508..4aeef35f7 100644
--- a/go/topology/allocate_volume.go
+++ b/go/topology/allocate_volume.go
@@ -12,11 +12,12 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
values := make(url.Values)
values.Add("volume", vid.String())
- values.Add("collection", collection)
- values.Add("replication", rp.String())
+ values.Add("collection", option.Collection)
+ values.Add("replication", option.ReplicaPlacement.String())
+ values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
if err != nil {
return err
diff --git a/go/topology/collection.go b/go/topology/collection.go
index b21122d22..c014231af 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -1,33 +1,34 @@
package topology
import (
- "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
)
type Collection struct {
Name string
volumeSizeLimit uint64
- replicaType2VolumeLayout []*VolumeLayout
+ storageType2VolumeLayout map[string]*VolumeLayout
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount)
+ c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
return c
}
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout {
- replicaPlacementIndex := rp.GetReplicationLevelIndex()
- if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil {
- glog.V(0).Infoln("collection", c.Name, "adding replication type", rp)
- c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit)
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+ keyString := rp.String()
+ if ttl != nil {
+ keyString += ttl.String()
}
- return c.replicaType2VolumeLayout[replicaPlacementIndex]
+ if c.storageType2VolumeLayout[keyString] == nil {
+ c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ }
+ return c.storageType2VolumeLayout[keyString]
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
@@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
if list := vl.ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index ae80e08bb..c67c5c1c1 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
}
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
- for vid, _ := range dn.volumes {
+ for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
+ deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
@@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
}
+ return
}
func (dn *DataNode) GetDataCenter() *DataCenter {
diff --git a/go/topology/topology.go b/go/topology/topology.go
index f1daffb53..acdef5e36 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
}
func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
- vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
@@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) {
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
+}
+func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ glog.Infof("removing volume info:%+v", v)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
+func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
@@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
- dn.UpdateVolumes(volumeInfos)
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 7398ff9bf..1e630e149 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement)
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index f66d4c251..d6400c988 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} {
m["DataCenters"] = dcs
var layouts []interface{}
for _, c := range t.collectionMap {
- for _, layout := range c.replicaType2VolumeLayout {
+ for _, layout := range c.storageType2VolumeLayout {
if layout != nil {
tmp := layout.ToMap()
tmp["collection"] = c.Name
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index a1d6d2564..9eaca37d4 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
func (t *Topology) Vacuum(garbageThreshold string) int {
for _, c := range t.collectionMap {
- for _, vl := range c.replicaType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout {
if vl != nil {
for vid, locationlist := range vl.vid2location {
if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
index 4965e3ba0..778aa038a 100644
--- a/go/topology/volume_growth.go
+++ b/go/topology/volume_growth.go
@@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues:
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
+ Ttl *storage.TTL
DataCenter string
Rack string
DataNode string
@@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
+ if err := AllocateVolume(server, vid, option); err == nil {
+ vi := storage.VolumeInfo{
+ Id: vid,
+ Size: 0,
+ Collection: option.Collection,
+ ReplicaPlacement: option.ReplicaPlacement,
+ Ttl: option.Ttl,
+ Version: storage.CurrentVersion,
+ }
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 538acb54c..1e55072a3 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -11,15 +11,17 @@ import (
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
+ ttl *storage.TTL
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
}
-func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
+ ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
@@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
+func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.removeFromWritable(v.Id)
+ delete(vl.vid2location, v.Id)
+}
+
func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
@@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
+ m["ttl"] = vl.ttl.String()
m["writables"] = vl.writables
//m["locations"] = vl.vid2location
return m