aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/data_node.go11
-rw-r--r--weed/topology/topology.go5
-rw-r--r--weed/topology/topology_map.go23
3 files changed, 34 insertions, 5 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 0ef8ae14e..6ee9a8a03 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -32,7 +32,7 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
}
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
dn.Lock()
defer dn.Unlock()
if _, ok := dn.volumes[v.Id]; !ok {
@@ -42,12 +42,14 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
+ isNew = true
} else {
dn.volumes[v.Id] = v
}
+ return
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
@@ -64,7 +66,10 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVo
}
dn.Unlock()
for _, v := range actualVolumes {
- dn.AddOrUpdateVolume(v)
+ isNew := dn.AddOrUpdateVolume(v)
+ if isNew {
+ newVolumes = append(newVolumes, v)
+ }
}
return
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 177c2a181..4242bfa05 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -151,7 +151,7 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
return dc
}
-func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) {
+func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) (newVolumes, deletedVolumes []storage.VolumeInfo) {
var volumeInfos []storage.VolumeInfo
for _, v := range volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
@@ -160,11 +160,12 @@ func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformati
glog.V(0).Infof("Fail to convert joined volume information: %v", err)
}
}
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
+ newVolumes, deletedVolumes = dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
+ return
}
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
index ce8e9e663..769ba0e2a 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_map.go
@@ -1,5 +1,7 @@
package topology
+import "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
@@ -51,3 +53,24 @@ func (t *Topology) ToVolumeMap() interface{} {
m["DataCenters"] = dcs
return m
}
+
+func (t *Topology) ToVolumeLocations() (volumeLocations []*master_pb.VolumeLocation) {
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ for _, r := range dc.Children() {
+ rack := r.(*Rack)
+ for _, d := range rack.Children() {
+ dn := d.(*DataNode)
+ volumeLocation := &master_pb.VolumeLocation{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ }
+ for _, v := range dn.GetVolumes() {
+ volumeLocation.NewVids = append(volumeLocation.NewVids, uint32(v.Id))
+ }
+ volumeLocations = append(volumeLocations, volumeLocation)
+ }
+ }
+ }
+ return
+}