diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/allocate_volume.go | 11 | ||||
| -rw-r--r-- | go/topology/cluster_commands.go | 4 | ||||
| -rw-r--r-- | go/topology/collection.go | 25 | ||||
| -rw-r--r-- | go/topology/data_node.go | 10 | ||||
| -rw-r--r-- | go/topology/node.go | 4 | ||||
| -rw-r--r-- | go/topology/store_replicate.go | 8 | ||||
| -rw-r--r-- | go/topology/topology.go | 29 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 10 | ||||
| -rw-r--r-- | go/topology/topology_map.go | 2 | ||||
| -rw-r--r-- | go/topology/topology_vacuum.go | 8 | ||||
| -rw-r--r-- | go/topology/volume_growth.go | 16 | ||||
| -rw-r--r-- | go/topology/volume_growth_test.go | 4 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 17 |
13 files changed, 89 insertions, 59 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index 77b4ac508..6562e9ac5 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" @@ -12,11 +12,12 @@ type AllocateVolumeResult struct { Error string } -func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { +func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { values := make(url.Values) values.Add("volume", vid.String()) - values.Add("collection", collection) - values.Add("replication", rp.String()) + values.Add("collection", option.Collection) + values.Add("replication", option.ReplicaPlacement.String()) + values.Add("ttl", option.Ttl.String()) jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) if err != nil { return err diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go index 703435173..cafc52c76 100644 --- a/go/topology/cluster_commands.go +++ b/go/topology/cluster_commands.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "github.com/goraft/raft" ) diff --git a/go/topology/collection.go b/go/topology/collection.go index b21122d22..506f43fbf 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -1,33 +1,34 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/storage" ) type Collection struct { Name string volumeSizeLimit uint64 - replicaType2VolumeLayout []*VolumeLayout + storageType2VolumeLayout map[string]*VolumeLayout } func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount) + c.storageType2VolumeLayout = make(map[string]*VolumeLayout) return c } -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout { - replicaPlacementIndex := rp.GetReplicationLevelIndex() - if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil { - glog.V(0).Infoln("collection", c.Name, "adding replication type", rp) - c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit) +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { + keyString := rp.String() + if ttl != nil { + keyString += ttl.String() } - return c.replicaType2VolumeLayout[replicaPlacementIndex] + if c.storageType2VolumeLayout[keyString] == nil { + c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + } + return c.storageType2VolumeLayout[keyString] } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { if list := vl.Lookup(vid); list != nil { return list @@ -38,7 +39,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { if list := vl.ListVolumeServers(); list != nil { nodes = append(nodes, list...) diff --git a/go/topology/data_node.go b/go/topology/data_node.go index ae80e08bb..c3b90470f 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "strconv" ) @@ -38,15 +38,16 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { } } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } - for vid, _ := range dn.volumes { + for vid, v := range dn.volumes { if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) delete(dn.volumes, vid) + deletedVolumes = append(deletedVolumes, v) dn.UpAdjustVolumeCountDelta(-1) dn.UpAdjustActiveVolumeCountDelta(-1) } @@ -54,6 +55,7 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { for _, v := range actualVolumes { dn.AddOrUpdateVolume(v) } + return } func (dn *DataNode) GetDataCenter() *DataCenter { diff --git a/go/topology/node.go b/go/topology/node.go index c52414008..54118802e 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "errors" "math/rand" "strings" diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index a982cebe5..6ea019bd8 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -2,10 +2,10 @@ package topology import ( "bytes" - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" "net/http" "strconv" ) diff --git a/go/topology/topology.go b/go/topology/topology.go index f1daffb53..c90e8de0b 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -1,10 +1,10 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/sequence" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/sequence" + "github.com/chrislusf/weed-fs/go/storage" "errors" "github.com/goraft/raft" "io/ioutil" @@ -110,12 +110,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { } func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -123,12 +123,12 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) } func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { @@ -141,10 +141,14 @@ func (t *Topology) DeleteCollection(collectionName string) { } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(&v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) +} +func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + glog.Infof("removing volume info:%+v", v) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { +func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { t.Sequence.SetMax(*joinMessage.MaxFileKey) dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) dc := t.GetOrCreateDataCenter(dcName) @@ -162,10 +166,13 @@ func (t *Topology) RegisterVolumes(joinMessage *operation.JoinMessage) { glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) } } - dn.UpdateVolumes(volumeInfos) + deletedVolumes := dn.UpdateVolumes(volumeInfos) for _, v := range volumeInfos { t.RegisterVolumeLayout(v, dn) } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } } func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 7398ff9bf..eb4491484 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "math/rand" "time" ) @@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -55,7 +55,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -65,7 +65,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index f66d4c251..d6400c988 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -14,7 +14,7 @@ func (t *Topology) ToMap() interface{} { m["DataCenters"] = dcs var layouts []interface{} for _, c := range t.collectionMap { - for _, layout := range c.replicaType2VolumeLayout { + for _, layout := range c.storageType2VolumeLayout { if layout != nil { tmp := layout.ToMap() tmp["collection"] = c.Name diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index a1d6d2564..72846f20b 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -1,9 +1,9 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" @@ -80,7 +80,7 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis } func (t *Topology) Vacuum(garbageThreshold string) int { for _, c := range t.collectionMap { - for _, vl := range c.replicaType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout { if vl != nil { for vid, locationlist := range vl.vid2location { if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 4965e3ba0..2859d3992 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "fmt" "math/rand" "sync" @@ -19,6 +19,7 @@ This package is created to resolve these replica placement issues: type VolumeGrowOption struct { Collection string ReplicaPlacement *storage.ReplicaPlacement + Ttl *storage.TTL DataCenter string Rack string DataNode string @@ -184,8 +185,15 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { for _, server := range servers { - if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion} + if err := AllocateVolume(server, vid, option); err == nil { + vi := storage.VolumeInfo{ + Id: vid, + Size: 0, + Collection: option.Collection, + ReplicaPlacement: option.ReplicaPlacement, + Ttl: option.Ttl, + Version: storage.CurrentVersion, + } server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(vi, server) glog.V(0).Infoln("Created Volume", vid, "on", server) diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index d2913ef5c..5581c87ce 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/sequence" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/sequence" + "github.com/chrislusf/weed-fs/go/storage" "encoding/json" "fmt" "testing" diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 538acb54c..7bb0cf7e3 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -1,8 +1,8 @@ package topology import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" "errors" "math/rand" "sync" @@ -11,15 +11,17 @@ import ( // mapping from volume to its locations, inverted from server to volume type VolumeLayout struct { rp *storage.ReplicaPlacement + ttl *storage.TTL vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 accessLock sync.Mutex } -func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ rp: rp, + ttl: ttl, vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, @@ -42,6 +44,14 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } } +func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.removeFromWritable(v.Id) + delete(vl.vid2location, v.Id) +} + func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) { for _, id := range vl.writables { if vid == id { @@ -192,6 +202,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { func (vl *VolumeLayout) ToMap() map[string]interface{} { m := make(map[string]interface{}) m["replication"] = vl.rp.String() + m["ttl"] = vl.ttl.String() m["writables"] = vl.writables //m["locations"] = vl.vid2location return m |
