aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/filer.go6
-rw-r--r--weed/command/server.go4
-rw-r--r--weed/filer/filer.go3
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go7
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go5
-rw-r--r--weed/filer/leveldb3/leveldb3_store_test.go5
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go6
-rw-r--r--weed/iamapi/iamapi_server.go2
-rw-r--r--weed/mq/broker/broker_server.go2
-rw-r--r--weed/pb/server_address.go37
-rw-r--r--weed/pb/server_address_test.go36
-rw-r--r--weed/pb/server_discovery.go62
-rw-r--r--weed/server/filer_grpc_server_admin.go2
-rw-r--r--weed/server/filer_server.go10
-rw-r--r--weed/server/master_server.go2
-rw-r--r--weed/shell/commands.go2
-rw-r--r--weed/stats/metrics.go1
-rw-r--r--weed/wdclient/masterclient.go15
-rw-r--r--weed/wdclient/vid_map_test.go5
20 files changed, 177 insertions, 37 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 7f132892e..7f9a23cf8 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -127,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 83e2abdac..7e636974f 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -33,7 +33,7 @@ var (
)
type FilerOptions struct {
- masters map[string]pb.ServerAddress
+ masters *pb.ServerDiscovery
mastersString *string
ip *string
bindIp *string
@@ -65,7 +65,7 @@ type FilerOptions struct {
func init() {
cmdFiler.Run = runFiler // break init cycle
- f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers or a single DNS SRV record of at least 1 master server, prepended with dnssrv+")
f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
@@ -208,7 +208,7 @@ func runFiler(cmd *Command, args []string) bool {
}(startDelay)
}
- f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
+ f.masters = pb.ServerAddresses(*f.mastersString).ToServiceDiscovery()
f.startFiler()
diff --git a/weed/command/server.go b/weed/command/server.go
index fecb1cad6..7fbb59676 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -203,7 +203,7 @@ func runServer(cmd *Command, args []string) bool {
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap()
+ filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToServiceDiscovery()
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
s3Options.bindIp = serverBindIp
@@ -216,7 +216,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
mqBrokerOptions.ip = serverIp
- mqBrokerOptions.masters = filerOptions.masters
+ mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap()
mqBrokerOptions.filerGroup = filerOptions.filerGroup
// serverOptions.v.pulseSeconds = pulseSeconds
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 8570faa7a..fdc425f07 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -52,8 +52,7 @@ type Filer struct {
Dlm *lock_manager.DistributedLockManager
}
-func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
- filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
+func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index 7013f67a7..c8e71a003 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -3,6 +3,7 @@ package leveldb
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"os"
"testing"
"time"
@@ -12,7 +13,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
@@ -65,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
@@ -87,7 +88,7 @@ func TestEmptyRoot(t *testing.T) {
}
func BenchmarkInsertEntry(b *testing.B) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := b.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index f7ec99e06..b25dcc7b8 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -2,6 +2,7 @@ package leveldb
import (
"context"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"testing"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -9,7 +10,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir()
store := &LevelDB2Store{}
store.initialize(dir, 2)
@@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir()
store := &LevelDB2Store{}
store.initialize(dir, 2)
diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go
index e2e4d5099..a2d8dd8a3 100644
--- a/weed/filer/leveldb3/leveldb3_store_test.go
+++ b/weed/filer/leveldb3/leveldb3_store_test.go
@@ -2,6 +2,7 @@ package leveldb
import (
"context"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"testing"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -9,7 +10,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir()
store := &LevelDB3Store{}
store.initialize(dir)
@@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir()
store := &LevelDB3Store{}
store.initialize(dir)
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
index e89327baa..e24274d2a 100644
--- a/weed/filer/rocksdb/rocksdb_store_test.go
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -15,7 +15,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil)
dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
@@ -68,7 +68,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil)
dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
@@ -90,7 +90,7 @@ func TestEmptyRoot(t *testing.T) {
}
func BenchmarkInsertEntry(b *testing.B) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil)
dir := b.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go
index 223bcb296..63d2e7a75 100644
--- a/weed/iamapi/iamapi_server.go
+++ b/weed/iamapi/iamapi_server.go
@@ -50,7 +50,7 @@ var s3ApiConfigure IamS3ApiConfig
func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) {
s3ApiConfigure = IamS3ApiConfigure{
option: option,
- masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", option.Masters),
+ masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)),
}
s3Option := s3api.S3ApiServerOption{Filer: option.Filer}
iamApiServer = &IamApiServer{
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 4f5b3c28d..da1284c80 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -41,7 +41,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
}
diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go
index 56d0dba24..a0aa79ae4 100644
--- a/weed/pb/server_address.go
+++ b/weed/pb/server_address.go
@@ -11,6 +11,7 @@ import (
type ServerAddress string
type ServerAddresses string
+type ServerSrvAddress string
func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
if grpcPort == 0 || grpcPort == port+10000 {
@@ -76,6 +77,42 @@ func (sa ServerAddress) ToGrpcAddress() string {
return ServerToGrpcAddress(string(sa))
}
+// LookUp may return an error for some records along with successful lookups - make sure you do not
+// discard `addresses` even if `err == nil`
+func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) {
+ _, records, lookupErr := net.LookupSRV("", "", string(r))
+ if lookupErr != nil {
+ err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr)
+ }
+ for _, srv := range records {
+ address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+ addresses = append(addresses, ServerAddress(address))
+ }
+ return
+}
+
+// ToServiceDiscovery expects one of: a comma-separated list of ip:port, like
+//
+// 10.0.0.1:9999,10.0.0.2:24:9999
+//
+// OR an SRV Record prepended with 'dnssrv+', like:
+//
+// dnssrv+_grpc._tcp.master.consul
+// dnssrv+_grpc._tcp.headless.default.svc.cluster.local
+// dnssrv+seaweed-master.master.consul
+func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) {
+ sd = &ServerDiscovery{}
+ prefix := "dnssrv+"
+ if strings.HasPrefix(string(sa), prefix) {
+ trimmed := strings.TrimPrefix(string(sa), prefix)
+ srv := ServerSrvAddress(trimmed)
+ sd.srvRecord = &srv
+ } else {
+ sd.list = sa.ToAddresses()
+ }
+ return
+}
+
func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
parts := strings.Split(string(sa), ",")
for _, address := range parts {
diff --git a/weed/pb/server_address_test.go b/weed/pb/server_address_test.go
new file mode 100644
index 000000000..f5a12427a
--- /dev/null
+++ b/weed/pb/server_address_test.go
@@ -0,0 +1,36 @@
+package pb
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestServerAddresses_ToAddressMapOrSrv_shouldRemovePrefix(t *testing.T) {
+ str := ServerAddresses("dnssrv+hello.srv.consul")
+
+ d := str.ToServiceDiscovery()
+
+ expected := ServerSrvAddress("hello.srv.consul")
+ if *d.srvRecord != expected {
+ t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected %s`, *d.srvRecord, expected)
+ }
+}
+
+func TestServerAddresses_ToAddressMapOrSrv_shouldHandleIPPortList(t *testing.T) {
+ str := ServerAddresses("10.0.0.1:23,10.0.0.2:24")
+
+ d := str.ToServiceDiscovery()
+
+ if d.srvRecord != nil {
+ t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected nil`, *d.srvRecord)
+ }
+
+ expected := []ServerAddress{
+ ServerAddress("10.0.0.1:23"),
+ ServerAddress("10.0.0.2:24"),
+ }
+
+ if !reflect.DeepEqual(d.list, expected) {
+ t.Fatalf(`Expected %q, got %q`, expected, d.list)
+ }
+}
diff --git a/weed/pb/server_discovery.go b/weed/pb/server_discovery.go
new file mode 100644
index 000000000..25c0360c5
--- /dev/null
+++ b/weed/pb/server_discovery.go
@@ -0,0 +1,62 @@
+package pb
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "reflect"
+)
+
+// ServerDiscovery encodes a way to find at least 1 instance of a service,
+// and provides utility functions to refresh the instance list
+type ServerDiscovery struct {
+ list []ServerAddress
+ srvRecord *ServerSrvAddress
+}
+
+func NewServiceDiscoveryFromMap(m map[string]ServerAddress) (sd *ServerDiscovery) {
+ sd = &ServerDiscovery{}
+ for _, s := range m {
+ sd.list = append(sd.list, s)
+ }
+ return sd
+}
+
+// RefreshBySrvIfAvailable performs a DNS SRV lookup and updates list with the results
+// of the lookup
+func (sd *ServerDiscovery) RefreshBySrvIfAvailable() {
+ if sd.srvRecord == nil {
+ return
+ }
+ newList, err := sd.srvRecord.LookUp()
+ if err != nil {
+ glog.V(0).Infof("failed to lookup SRV for %s: %v", *sd.srvRecord, err)
+ }
+ if newList == nil || len(newList) == 0 {
+ glog.V(0).Infof("looked up SRV for %s, but found no well-formed names", *sd.srvRecord)
+ return
+ }
+ if !reflect.DeepEqual(sd.list, newList) {
+ sd.list = newList
+ }
+}
+
+// GetInstances returns a copy of the latest known list of addresses
+// call RefreshBySrvIfAvailable prior to this in order to get a more up-to-date view
+func (sd *ServerDiscovery) GetInstances() (addresses []ServerAddress) {
+ for _, a := range sd.list {
+ addresses = append(addresses, a)
+ }
+ return addresses
+}
+func (sd *ServerDiscovery) GetInstancesAsStrings() (addresses []string) {
+ for _, i := range sd.list {
+ addresses = append(addresses, string(i))
+ }
+ return addresses
+}
+func (sd *ServerDiscovery) GetInstancesAsMap() (addresses map[string]ServerAddress) {
+ addresses = make(map[string]ServerAddress)
+ for _, i := range sd.list {
+ addresses[string(i)] = i
+ }
+ return addresses
+}
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
index 58215a927..8a58e287c 100644
--- a/weed/server/filer_grpc_server_admin.go
+++ b/weed/server/filer_grpc_server_admin.go
@@ -87,7 +87,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{
- Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
+ Masters: fs.option.Masters.GetInstancesAsStrings(),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 8e40b2145..98784bce3 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -50,7 +50,7 @@ import (
)
type FilerOption struct {
- Masters map[string]pb.ServerAddress
+ Masters *pb.ServerDiscovery
FilerGroup string
Collection string
DefaultReplication string
@@ -118,11 +118,12 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
- if len(option.Masters) == 0 {
+ option.Masters.RefreshBySrvIfAvailable()
+ if len(option.Masters.GetInstances()) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
@@ -195,7 +196,8 @@ func (fs *FilerServer) checkWithMaster() {
isConnected := false
for !isConnected {
- for _, master := range fs.option.Masters {
+ fs.option.Masters.RefreshBySrvIfAvailable()
+ for _, master := range fs.option.Masters.GetInstances() {
readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 2489aaefd..9a5313a10 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -110,7 +110,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(),
}
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index b1722edfb..e6e582376 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -51,7 +51,7 @@ var (
func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()),
option: options,
}
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell")
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index dda4d95e5..3dda42423 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -299,7 +299,6 @@ func JoinHostPort(host string, port int) string {
return net.JoinHostPort(host, portStr)
}
-
func StartMetricsServer(ip string, port int) {
if port == 0 {
return
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index c693df582..a6ddf22f3 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -24,8 +24,8 @@ type MasterClient struct {
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
- masters map[string]pb.ServerAddress
- grpcDialOption grpc.DialOption
+ masters pb.ServerDiscovery
+ grpcDialOption grpc.DialOption
*vidMap
vidMapCacheSize int
@@ -33,7 +33,7 @@ type MasterClient struct {
OnPeerUpdateLock sync.RWMutex
}
-func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient {
+func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
return &MasterClient{
FilerGroup: filerGroup,
clientType: clientType,
@@ -108,9 +108,9 @@ func (mc *MasterClient) GetMaster() pb.ServerAddress {
return mc.getCurrentMaster()
}
-func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
+func (mc *MasterClient) GetMasters() []pb.ServerAddress {
mc.WaitUntilConnected()
- return mc.masters
+ return mc.masters.GetInstances()
}
func (mc *MasterClient) WaitUntilConnected() {
@@ -132,7 +132,7 @@ func (mc *MasterClient) KeepConnectedToMaster() {
}
func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
- for _, master := range mc.masters {
+ for _, master := range mc.masters.GetInstances() {
if master == myMasterAddress {
continue
}
@@ -159,7 +159,8 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
func (mc *MasterClient) tryAllMasters() {
var nextHintedLeader pb.ServerAddress
- for _, master := range mc.masters {
+ mc.masters.RefreshBySrvIfAvailable()
+ for _, master := range mc.masters.GetInstances() {
nextHintedLeader = mc.tryConnectToMaster(master)
for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go
index 980e5bd8c..a734c6b0c 100644
--- a/weed/wdclient/vid_map_test.go
+++ b/weed/wdclient/vid_map_test.go
@@ -3,6 +3,7 @@ package wdclient
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
"strconv"
"sync"
@@ -65,7 +66,7 @@ func TestLocationIndex(t *testing.T) {
}
func TestLookupFileId(t *testing.T) {
- mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil)
+ mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
length := 5
//Construct a cache linked list of length 5
@@ -135,7 +136,7 @@ func TestLookupFileId(t *testing.T) {
}
func TestConcurrentGetLocations(t *testing.T) {
- mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil)
+ mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
location := Location{Url: "TestDataRacing"}
mc.addLocation(1, location)