aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-06-25 00:01:53 -0700
committerChris Lu <chris.lu@gmail.com>2018-06-25 00:01:53 -0700
commit0d83c1b91eba8edb64b247984a2a02d3e774c9ce (patch)
treed5840a400aae31943a24e2dac180e584162b2d0b
parent018a9a20be26b704abb27b19a22a4377624a9d5c (diff)
downloadseaweedfs-0d83c1b91eba8edb64b247984a2a02d3e774c9ce.tar.xz
seaweedfs-0d83c1b91eba8edb64b247984a2a02d3e774c9ce.zip
refactoring
-rw-r--r--weed/server/master_grpc_server.go17
-rw-r--r--weed/topology/topology.go19
-rw-r--r--weed/topology/topology_test.go30
3 files changed, 22 insertions, 44 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 6c293fe95..f24cea619 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -6,7 +6,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer"
)
@@ -50,21 +49,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
- var volumeInfos []storage.VolumeInfo
- for _, v := range heartbeat.Volumes {
- if vi, err := storage.NewVolumeInfo(v); err == nil {
- volumeInfos = append(volumeInfos, vi)
- } else {
- glog.V(0).Infof("Fail to convert joined volume information: %v", err)
- }
- }
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
- t.RegisterVolumeLayout(v, dn)
- }
- for _, v := range deletedVolumes {
- t.UnRegisterVolumeLayout(v, dn)
- }
+ t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
} else {
return err
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 27d22ee7e..cee156dc1 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
type Topology struct {
@@ -145,3 +146,21 @@ func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
t.LinkChildNode(dc)
return dc
}
+
+func (t *Topology) SyncDataNodeRegistration(volumes []*master_pb.VolumeInformationMessage, dn *DataNode) {
+ var volumeInfos []storage.VolumeInfo
+ for _, v := range volumes {
+ if vi, err := storage.NewVolumeInfo(v); err == nil {
+ volumeInfos = append(volumeInfos, vi)
+ } else {
+ glog.V(0).Infof("Fail to convert joined volume information: %v", err)
+ }
+ }
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
+ for _, v := range volumeInfos {
+ t.RegisterVolumeLayout(v, dn)
+ }
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
+}
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 06f506c37..36aa07157 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -4,7 +4,6 @@ import (
"testing"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
func TestRemoveDataCenter(t *testing.T) {
@@ -44,20 +43,8 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
}
volumeMessages = append(volumeMessages, volumeMessage)
}
- var volumeInfos []storage.VolumeInfo
- for _, v := range volumeMessages {
- if vi, err := storage.NewVolumeInfo(v); err == nil {
- volumeInfos = append(volumeInfos, vi)
- }
- }
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
- topo.RegisterVolumeLayout(v, dn)
- }
- for _, v := range deletedVolumes {
- topo.UnRegisterVolumeLayout(v, dn)
- }
+ topo.SyncDataNodeRegistration(volumeMessages, dn)
assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
assert(t, "volumeCount", topo.volumeCount, volumeCount)
@@ -81,20 +68,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
}
volumeMessages = append(volumeMessages, volumeMessage)
}
- var volumeInfos []storage.VolumeInfo
- for _, v := range volumeMessages {
- if vi, err := storage.NewVolumeInfo(v); err == nil {
- volumeInfos = append(volumeInfos, vi)
- }
- }
-
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
- topo.RegisterVolumeLayout(v, dn)
- }
- for _, v := range deletedVolumes {
- topo.UnRegisterVolumeLayout(v, dn)
- }
+ topo.SyncDataNodeRegistration(volumeMessages, dn)
assert(t, "activeVolumeCount1", topo.activeVolumeCount, volumeCount)
assert(t, "volumeCount", topo.volumeCount, volumeCount)