diff options
| author | Chris Lu <chris.lu@gmail.com> | 2013-11-12 02:21:22 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2013-11-12 02:21:22 -0800 |
| commit | 3b687111399fd08468e4a6232bcbe6068df961bf (patch) | |
| tree | cde07f3ba6c9ae411d55b25c331bca6827253c30 /go/topology | |
| parent | 8f0e2f31afad1fcb2f06ef3ae55866313b7b4b02 (diff) | |
| download | seaweedfs-3b687111399fd08468e4a6232bcbe6068df961bf.tar.xz seaweedfs-3b687111399fd08468e4a6232bcbe6068df961bf.zip | |
support for collections!
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/collection.go | 38 | ||||
| -rw-r--r-- | go/topology/topo_test.go | 7 | ||||
| -rw-r--r-- | go/topology/topology.go | 39 | ||||
| -rw-r--r-- | go/topology/topology_compact.go | 14 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 6 | ||||
| -rw-r--r-- | go/topology/topology_map.go | 10 |
6 files changed, 79 insertions, 35 deletions
diff --git a/go/topology/collection.go b/go/topology/collection.go new file mode 100644 index 000000000..0a7971424 --- /dev/null +++ b/go/topology/collection.go @@ -0,0 +1,38 @@ +package topology + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/storage" +) + +type Collection struct { + Name string + volumeSizeLimit uint64 + replicaType2VolumeLayout []*VolumeLayout +} + +func NewCollection(name string, volumeSizeLimit uint64) *Collection { + c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} + c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + return c +} + +func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout { + replicationTypeIndex := repType.GetReplicationLevelIndex() + if c.replicaType2VolumeLayout[replicationTypeIndex] == nil { + glog.V(0).Infoln("collection", c.Name, "adding replication type", repType) + c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit) + } + return c.replicaType2VolumeLayout[replicationTypeIndex] +} + +func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { + for _, vl := range c.replicaType2VolumeLayout { + if vl != nil { + if list := vl.Lookup(vid); list != nil { + return list + } + } + } + return nil +} diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index 36f4963db..c0edca7c1 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -99,9 +99,10 @@ func setup(topologyLayout string) *Topology { for _, v := range serverMap["volumes"].([]interface{}) { m := v.(map[string]interface{}) vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), - Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} + Id: storage.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Collection: "testingCollection", + Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) } server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) diff --git a/go/topology/topology.go b/go/topology/topology.go index b21601210..5b3d29e0b 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -12,8 +12,7 @@ import ( type Topology struct { NodeImpl - //transient vid~servers mapping for each replication type - replicaType2VolumeLayout []*VolumeLayout + collectionMap map[string]*Collection pulse int64 @@ -34,7 +33,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) - t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + t.collectionMap = make(map[string]*Collection) t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -60,13 +59,18 @@ func (t *Topology) loadConfiguration(configurationFile string) error { return nil } -func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - if list := vl.Lookup(vid); list != nil { +func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { + //maybe an issue if lots of collections? + if collection == "" { + for _, c := range t.collectionMap { + if list := c.Lookup(vid); list != nil { return list } } + } else { + if c, ok := t.collectionMap[collection]; ok { + return c.Lookup(vid) + } } return nil } @@ -86,12 +90,8 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return vid.Next() } -func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) - } - vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter) +func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -99,17 +99,16 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, data return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - glog.V(0).Infoln("adding replication type", repType) - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) +func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout { + _, ok := t.collectionMap[collectionName] + if !ok { + t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.replicaType2VolumeLayout[replicationTypeIndex] + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType) } func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) + t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn) } func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go index 4ba77a4a5..a1d6d2564 100644 --- a/go/topology/topology_compact.go +++ b/go/topology/topology_compact.go @@ -79,12 +79,14 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis return isCommitSuccess } func (t *Topology) Vacuum(garbageThreshold string) int { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) + for _, c := range t.collectionMap { + for _, vl := range c.replicaType2VolumeLayout { + if vl != nil { + for vid, locationlist := range vl.vid2location { + if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(vl, vid, locationlist) { + batchVacuumVolumeCommit(vl, vid, locationlist) + } } } } diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index f3b09c649..7f81d8184 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -37,7 +37,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.RepType) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -49,7 +49,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.RepType) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -59,7 +59,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.RepType) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index b416ee943..f66d4c251 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -13,9 +13,13 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, layout := range t.replicaType2VolumeLayout { - if layout != nil { - layouts = append(layouts, layout.ToMap()) + for _, c := range t.collectionMap { + for _, layout := range c.replicaType2VolumeLayout { + if layout != nil { + tmp := layout.ToMap() + tmp["collection"] = c.Name + layouts = append(layouts, tmp) + } } } m["layouts"] = layouts |
