aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-11-12 02:21:22 -0800
committerChris Lu <chris.lu@gmail.com>2013-11-12 02:21:22 -0800
commit3b687111399fd08468e4a6232bcbe6068df961bf (patch)
treecde07f3ba6c9ae411d55b25c331bca6827253c30 /go/topology
parent8f0e2f31afad1fcb2f06ef3ae55866313b7b4b02 (diff)
downloadseaweedfs-3b687111399fd08468e4a6232bcbe6068df961bf.tar.xz
seaweedfs-3b687111399fd08468e4a6232bcbe6068df961bf.zip
support for collections!
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/collection.go38
-rw-r--r--go/topology/topo_test.go7
-rw-r--r--go/topology/topology.go39
-rw-r--r--go/topology/topology_compact.go14
-rw-r--r--go/topology/topology_event_handling.go6
-rw-r--r--go/topology/topology_map.go10
6 files changed, 79 insertions, 35 deletions
diff --git a/go/topology/collection.go b/go/topology/collection.go
new file mode 100644
index 000000000..0a7971424
--- /dev/null
+++ b/go/topology/collection.go
@@ -0,0 +1,38 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+)
+
+type Collection struct {
+ Name string
+ volumeSizeLimit uint64
+ replicaType2VolumeLayout []*VolumeLayout
+}
+
+func NewCollection(name string, volumeSizeLimit uint64) *Collection {
+ c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
+ c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ return c
+}
+
+func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
+ replicationTypeIndex := repType.GetReplicationLevelIndex()
+ if c.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ glog.V(0).Infoln("collection", c.Name, "adding replication type", repType)
+ c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit)
+ }
+ return c.replicaType2VolumeLayout[replicationTypeIndex]
+}
+
+func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
+ for _, vl := range c.replicaType2VolumeLayout {
+ if vl != nil {
+ if list := vl.Lookup(vid); list != nil {
+ return list
+ }
+ }
+ }
+ return nil
+}
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
index 36f4963db..c0edca7c1 100644
--- a/go/topology/topo_test.go
+++ b/go/topology/topo_test.go
@@ -99,9 +99,10 @@ func setup(topologyLayout string) *Topology {
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
- Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Collection: "testingCollection",
+ Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
diff --git a/go/topology/topology.go b/go/topology/topology.go
index b21601210..5b3d29e0b 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -12,8 +12,7 @@ import (
type Topology struct {
NodeImpl
- //transient vid~servers mapping for each replication type
- replicaType2VolumeLayout []*VolumeLayout
+ collectionMap map[string]*Collection
pulse int64
@@ -34,7 +33,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
- t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ t.collectionMap = make(map[string]*Collection)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
@@ -60,13 +59,18 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
return nil
}
-func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- if list := vl.Lookup(vid); list != nil {
+func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
+ //maybe an issue if lots of collections?
+ if collection == "" {
+ for _, c := range t.collectionMap {
+ if list := c.Lookup(vid); list != nil {
return list
}
}
+ } else {
+ if c, ok := t.collectionMap[collection]; ok {
+ return c.Lookup(vid)
+ }
}
return nil
}
@@ -86,12 +90,8 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return vid.Next()
}
-func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
- }
- vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter)
+func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -99,17 +99,16 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, data
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- glog.V(0).Infoln("adding replication type", repType)
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout {
+ _, ok := t.collectionMap[collectionName]
+ if !ok {
+ t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.replicaType2VolumeLayout[replicationTypeIndex]
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType)
}
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
+ t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go
index 4ba77a4a5..a1d6d2564 100644
--- a/go/topology/topology_compact.go
+++ b/go/topology/topology_compact.go
@@ -79,12 +79,14 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess
}
func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, vl := range t.replicaType2VolumeLayout {
- if vl != nil {
- for vid, locationlist := range vl.vid2location {
- if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(vl, vid, locationlist) {
- batchVacuumVolumeCommit(vl, vid, locationlist)
+ for _, c := range t.collectionMap {
+ for _, vl := range c.replicaType2VolumeLayout {
+ if vl != nil {
+ for vid, locationlist := range vl.vid2location {
+ if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(vl, vid, locationlist) {
+ batchVacuumVolumeCommit(vl, vid, locationlist)
+ }
}
}
}
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index f3b09c649..7f81d8184 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -37,7 +37,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.RepType)
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@@ -49,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.RepType)
+ vl := t.GetVolumeLayout(v.Collection, v.RepType)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@@ -59,7 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.RepType)
+ vl := t.GetVolumeLayout(v.Collection, v.RepType)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index b416ee943..f66d4c251 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -13,9 +13,13 @@ func (t *Topology) ToMap() interface{} {
}
m["DataCenters"] = dcs
var layouts []interface{}
- for _, layout := range t.replicaType2VolumeLayout {
- if layout != nil {
- layouts = append(layouts, layout.ToMap())
+ for _, c := range t.collectionMap {
+ for _, layout := range c.replicaType2VolumeLayout {
+ if layout != nil {
+ tmp := layout.ToMap()
+ tmp["collection"] = c.Name
+ layouts = append(layouts, tmp)
+ }
}
}
m["layouts"] = layouts