aboutsummaryrefslogtreecommitdiff
path: root/weed/pb
diff options
context:
space:
mode:
authorNico D'Cotta <45274424+Cottand@users.noreply.github.com>2023-08-24 16:08:56 +0200
committerGitHub <noreply@github.com>2023-08-24 07:08:56 -0700
commit796b7508f33916f7fa734e3df2ceea9a80415ade (patch)
tree4bc3913d93e1e328fd8f7275b11452e61d8175d2 /weed/pb
parent5251b4d50ea55c5f0a2fbc60206785bf80775bac (diff)
downloadseaweedfs-796b7508f33916f7fa734e3df2ceea9a80415ade.tar.xz
seaweedfs-796b7508f33916f7fa734e3df2ceea9a80415ade.zip
Implement SRV lookups for filer (#4767)
Diffstat (limited to 'weed/pb')
-rw-r--r--weed/pb/server_address.go37
-rw-r--r--weed/pb/server_address_test.go36
-rw-r--r--weed/pb/server_discovery.go62
3 files changed, 135 insertions, 0 deletions
diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go
index 56d0dba24..a0aa79ae4 100644
--- a/weed/pb/server_address.go
+++ b/weed/pb/server_address.go
@@ -11,6 +11,7 @@ import (
type ServerAddress string
type ServerAddresses string
+type ServerSrvAddress string
func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
if grpcPort == 0 || grpcPort == port+10000 {
@@ -76,6 +77,42 @@ func (sa ServerAddress) ToGrpcAddress() string {
return ServerToGrpcAddress(string(sa))
}
+// LookUp may return an error for some records along with successful lookups - make sure you do not
+// discard `addresses` even if `err == nil`
+func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) {
+ _, records, lookupErr := net.LookupSRV("", "", string(r))
+ if lookupErr != nil {
+ err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr)
+ }
+ for _, srv := range records {
+ address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
+ addresses = append(addresses, ServerAddress(address))
+ }
+ return
+}
+
+// ToServiceDiscovery expects one of: a comma-separated list of ip:port, like
+//
+// 10.0.0.1:9999,10.0.0.2:24:9999
+//
+// OR an SRV Record prepended with 'dnssrv+', like:
+//
+// dnssrv+_grpc._tcp.master.consul
+// dnssrv+_grpc._tcp.headless.default.svc.cluster.local
+// dnssrv+seaweed-master.master.consul
+func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) {
+ sd = &ServerDiscovery{}
+ prefix := "dnssrv+"
+ if strings.HasPrefix(string(sa), prefix) {
+ trimmed := strings.TrimPrefix(string(sa), prefix)
+ srv := ServerSrvAddress(trimmed)
+ sd.srvRecord = &srv
+ } else {
+ sd.list = sa.ToAddresses()
+ }
+ return
+}
+
func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
parts := strings.Split(string(sa), ",")
for _, address := range parts {
diff --git a/weed/pb/server_address_test.go b/weed/pb/server_address_test.go
new file mode 100644
index 000000000..f5a12427a
--- /dev/null
+++ b/weed/pb/server_address_test.go
@@ -0,0 +1,36 @@
+package pb
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestServerAddresses_ToAddressMapOrSrv_shouldRemovePrefix(t *testing.T) {
+ str := ServerAddresses("dnssrv+hello.srv.consul")
+
+ d := str.ToServiceDiscovery()
+
+ expected := ServerSrvAddress("hello.srv.consul")
+ if *d.srvRecord != expected {
+ t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected %s`, *d.srvRecord, expected)
+ }
+}
+
+func TestServerAddresses_ToAddressMapOrSrv_shouldHandleIPPortList(t *testing.T) {
+ str := ServerAddresses("10.0.0.1:23,10.0.0.2:24")
+
+ d := str.ToServiceDiscovery()
+
+ if d.srvRecord != nil {
+ t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected nil`, *d.srvRecord)
+ }
+
+ expected := []ServerAddress{
+ ServerAddress("10.0.0.1:23"),
+ ServerAddress("10.0.0.2:24"),
+ }
+
+ if !reflect.DeepEqual(d.list, expected) {
+ t.Fatalf(`Expected %q, got %q`, expected, d.list)
+ }
+}
diff --git a/weed/pb/server_discovery.go b/weed/pb/server_discovery.go
new file mode 100644
index 000000000..25c0360c5
--- /dev/null
+++ b/weed/pb/server_discovery.go
@@ -0,0 +1,62 @@
+package pb
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "reflect"
+)
+
+// ServerDiscovery encodes a way to find at least 1 instance of a service,
+// and provides utility functions to refresh the instance list
+type ServerDiscovery struct {
+ list []ServerAddress
+ srvRecord *ServerSrvAddress
+}
+
+func NewServiceDiscoveryFromMap(m map[string]ServerAddress) (sd *ServerDiscovery) {
+ sd = &ServerDiscovery{}
+ for _, s := range m {
+ sd.list = append(sd.list, s)
+ }
+ return sd
+}
+
+// RefreshBySrvIfAvailable performs a DNS SRV lookup and updates list with the results
+// of the lookup
+func (sd *ServerDiscovery) RefreshBySrvIfAvailable() {
+ if sd.srvRecord == nil {
+ return
+ }
+ newList, err := sd.srvRecord.LookUp()
+ if err != nil {
+ glog.V(0).Infof("failed to lookup SRV for %s: %v", *sd.srvRecord, err)
+ }
+ if newList == nil || len(newList) == 0 {
+ glog.V(0).Infof("looked up SRV for %s, but found no well-formed names", *sd.srvRecord)
+ return
+ }
+ if !reflect.DeepEqual(sd.list, newList) {
+ sd.list = newList
+ }
+}
+
+// GetInstances returns a copy of the latest known list of addresses
+// call RefreshBySrvIfAvailable prior to this in order to get a more up-to-date view
+func (sd *ServerDiscovery) GetInstances() (addresses []ServerAddress) {
+ for _, a := range sd.list {
+ addresses = append(addresses, a)
+ }
+ return addresses
+}
+func (sd *ServerDiscovery) GetInstancesAsStrings() (addresses []string) {
+ for _, i := range sd.list {
+ addresses = append(addresses, string(i))
+ }
+ return addresses
+}
+func (sd *ServerDiscovery) GetInstancesAsMap() (addresses map[string]ServerAddress) {
+ addresses = make(map[string]ServerAddress)
+ for _, i := range sd.list {
+ addresses[string(i)] = i
+ }
+ return addresses
+}