aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go2
-rw-r--r--weed/topology/data_node.go7
-rw-r--r--weed/topology/rack.go3
-rw-r--r--weed/topology/topology.go9
-rw-r--r--weed/topology/topology_ec.go7
-rw-r--r--weed/topology/topology_test.go4
-rw-r--r--weed/topology/topology_vacuum.go13
7 files changed, 28 insertions, 17 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
index 7c7fae683..83043c23f 100644
--- a/weed/topology/allocate_volume.go
+++ b/weed/topology/allocate_volume.go
@@ -15,7 +15,7 @@ type AllocateVolumeResult struct {
func AllocateVolume(dn *DataNode, grpcDialOption grpc.DialOption, vid needle.VolumeId, option *VolumeGrowOption) error {
- return operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.AllocateVolume(context.Background(), &volume_server_pb.AllocateVolumeRequest{
VolumeId: uint32(vid),
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index cd3249c98..9f868681e 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -3,6 +3,7 @@ package topology
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -14,6 +15,7 @@ type DataNode struct {
NodeImpl
Ip string
Port int
+ GrpcPort int
PublicUrl string
LastSeen int64 // unix time in seconds
Counter int // in race condition, the previous dataNode was not dead
@@ -208,6 +210,10 @@ func (dn *DataNode) Url() string {
return util.JoinHostPort(dn.Ip, dn.Port)
}
+func (dn *DataNode) ServerAddress() pb.ServerAddress {
+ return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort)
+}
+
func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
@@ -239,6 +245,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
m := &master_pb.DataNodeInfo{
Id: string(dn.Id()),
DiskInfos: make(map[string]*master_pb.DiskInfo),
+ GrpcPort: uint32(dn.GrpcPort),
}
for _, c := range dn.Children() {
disk := c.(*Disk)
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index 9c77285c3..cd09746b2 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -30,7 +30,7 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
+func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
@@ -41,6 +41,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := NewDataNode(util.JoinHostPort(ip, port))
dn.Ip = ip
dn.Port = port
+ dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn)
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 39fc7dcad..ad440e244 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -3,6 +3,7 @@ package topology
import (
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
"sync"
@@ -71,7 +72,7 @@ func (t *Topology) IsLeader() bool {
return true
}
if leader, err := t.Leader(); err == nil {
- if t.RaftServer.Name() == leader {
+ if pb.ServerAddress(t.RaftServer.Name()) == leader {
return true
}
}
@@ -79,11 +80,11 @@ func (t *Topology) IsLeader() bool {
return false
}
-func (t *Topology) Leader() (string, error) {
- l := ""
+func (t *Topology) Leader() (pb.ServerAddress, error) {
+ var l pb.ServerAddress
for count := 0; count < 3; count++ {
if t.RaftServer != nil {
- l = t.RaftServer.Leader()
+ l = pb.ServerAddress(t.RaftServer.Leader())
} else {
return "", errors.New("Raft Server not ready yet!")
}
diff --git a/weed/topology/topology_ec.go b/weed/topology/topology_ec.go
index 022eeb578..fdc4f274e 100644
--- a/weed/topology/topology_ec.go
+++ b/weed/topology/topology_ec.go
@@ -2,6 +2,7 @@ package topology
import (
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -135,16 +136,16 @@ func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocati
return
}
-func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
+func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) {
t.ecShardMapLock.RLock()
defer t.ecShardMapLock.RUnlock()
- dateNodeMap := make(map[string]bool)
+ dateNodeMap := make(map[pb.ServerAddress]bool)
for _, ecVolumeLocation := range t.ecShardMap {
if ecVolumeLocation.Collection == collection {
for _, locations := range ecVolumeLocation.Locations {
for _, loc := range locations {
- dateNodeMap[string(loc.Id())] = true
+ dateNodeMap[loc.ServerAddress()] = true
}
}
}
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index ecfe9d8d1..bfe7faff7 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -31,7 +31,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0,"127.0.0.1", maxVolumeCounts)
{
volumeCount := 7
@@ -177,7 +177,7 @@ func TestAddRemoveVolume(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0,"127.0.0.1", maxVolumeCounts)
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 9feb55b73..03340c17f 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -2,6 +2,7 @@ package topology
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"sync/atomic"
"time"
@@ -19,7 +20,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
ch := make(chan int, locationlist.Length())
errCount := int32(0)
for index, dn := range locationlist.list {
- go func(index int, url string, vid needle.VolumeId) {
+ go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
VolumeId: uint32(vid),
@@ -39,7 +40,7 @@ func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid ne
if err != nil {
glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
}
- }(index, dn.Url(), vid)
+ }(index, dn.ServerAddress(), vid)
}
vacuumLocationList := NewVolumeLocationList()
@@ -66,7 +67,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
- go func(index int, url string, vid needle.VolumeId) {
+ go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
err := operation.WithVolumeServerClient(url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
@@ -82,7 +83,7 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
ch <- true
}
- }(index, dn.Url(), vid)
+ }(index, dn.ServerAddress(), vid)
}
isVacuumSuccess := true
@@ -104,7 +105,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
isReadOnly := false
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
@@ -130,7 +131,7 @@ func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *V
func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
- err := operation.WithVolumeServerClient(dn.Url(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
VolumeId: uint32(vid),
})