diff options
| author | Nico D'Cotta <45274424+Cottand@users.noreply.github.com> | 2023-08-24 16:08:56 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-08-24 07:08:56 -0700 |
| commit | 796b7508f33916f7fa734e3df2ceea9a80415ade (patch) | |
| tree | 4bc3913d93e1e328fd8f7275b11452e61d8175d2 /weed/pb | |
| parent | 5251b4d50ea55c5f0a2fbc60206785bf80775bac (diff) | |
| download | seaweedfs-796b7508f33916f7fa734e3df2ceea9a80415ade.tar.xz seaweedfs-796b7508f33916f7fa734e3df2ceea9a80415ade.zip | |
Implement SRV lookups for filer (#4767)
Diffstat (limited to 'weed/pb')
| -rw-r--r-- | weed/pb/server_address.go | 37 | ||||
| -rw-r--r-- | weed/pb/server_address_test.go | 36 | ||||
| -rw-r--r-- | weed/pb/server_discovery.go | 62 |
3 files changed, 135 insertions, 0 deletions
diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index 56d0dba24..a0aa79ae4 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -11,6 +11,7 @@ import ( type ServerAddress string type ServerAddresses string +type ServerSrvAddress string func NewServerAddress(host string, port int, grpcPort int) ServerAddress { if grpcPort == 0 || grpcPort == port+10000 { @@ -76,6 +77,42 @@ func (sa ServerAddress) ToGrpcAddress() string { return ServerToGrpcAddress(string(sa)) } +// LookUp may return an error for some records along with successful lookups - make sure you do not +// discard `addresses` even if `err == nil` +func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) { + _, records, lookupErr := net.LookupSRV("", "", string(r)) + if lookupErr != nil { + err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr) + } + for _, srv := range records { + address := fmt.Sprintf("%s:%d", srv.Target, srv.Port) + addresses = append(addresses, ServerAddress(address)) + } + return +} + +// ToServiceDiscovery expects one of: a comma-separated list of ip:port, like +// +// 10.0.0.1:9999,10.0.0.2:24:9999 +// +// OR an SRV Record prepended with 'dnssrv+', like: +// +// dnssrv+_grpc._tcp.master.consul +// dnssrv+_grpc._tcp.headless.default.svc.cluster.local +// dnssrv+seaweed-master.master.consul +func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) { + sd = &ServerDiscovery{} + prefix := "dnssrv+" + if strings.HasPrefix(string(sa), prefix) { + trimmed := strings.TrimPrefix(string(sa), prefix) + srv := ServerSrvAddress(trimmed) + sd.srvRecord = &srv + } else { + sd.list = sa.ToAddresses() + } + return +} + func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) { parts := strings.Split(string(sa), ",") for _, address := range parts { diff --git a/weed/pb/server_address_test.go b/weed/pb/server_address_test.go new file mode 100644 index 000000000..f5a12427a --- /dev/null +++ b/weed/pb/server_address_test.go @@ -0,0 +1,36 @@ +package pb + +import ( + "reflect" + "testing" +) + +func TestServerAddresses_ToAddressMapOrSrv_shouldRemovePrefix(t *testing.T) { + str := ServerAddresses("dnssrv+hello.srv.consul") + + d := str.ToServiceDiscovery() + + expected := ServerSrvAddress("hello.srv.consul") + if *d.srvRecord != expected { + t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected %s`, *d.srvRecord, expected) + } +} + +func TestServerAddresses_ToAddressMapOrSrv_shouldHandleIPPortList(t *testing.T) { + str := ServerAddresses("10.0.0.1:23,10.0.0.2:24") + + d := str.ToServiceDiscovery() + + if d.srvRecord != nil { + t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected nil`, *d.srvRecord) + } + + expected := []ServerAddress{ + ServerAddress("10.0.0.1:23"), + ServerAddress("10.0.0.2:24"), + } + + if !reflect.DeepEqual(d.list, expected) { + t.Fatalf(`Expected %q, got %q`, expected, d.list) + } +} diff --git a/weed/pb/server_discovery.go b/weed/pb/server_discovery.go new file mode 100644 index 000000000..25c0360c5 --- /dev/null +++ b/weed/pb/server_discovery.go @@ -0,0 +1,62 @@ +package pb + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "reflect" +) + +// ServerDiscovery encodes a way to find at least 1 instance of a service, +// and provides utility functions to refresh the instance list +type ServerDiscovery struct { + list []ServerAddress + srvRecord *ServerSrvAddress +} + +func NewServiceDiscoveryFromMap(m map[string]ServerAddress) (sd *ServerDiscovery) { + sd = &ServerDiscovery{} + for _, s := range m { + sd.list = append(sd.list, s) + } + return sd +} + +// RefreshBySrvIfAvailable performs a DNS SRV lookup and updates list with the results +// of the lookup +func (sd *ServerDiscovery) RefreshBySrvIfAvailable() { + if sd.srvRecord == nil { + return + } + newList, err := sd.srvRecord.LookUp() + if err != nil { + glog.V(0).Infof("failed to lookup SRV for %s: %v", *sd.srvRecord, err) + } + if newList == nil || len(newList) == 0 { + glog.V(0).Infof("looked up SRV for %s, but found no well-formed names", *sd.srvRecord) + return + } + if !reflect.DeepEqual(sd.list, newList) { + sd.list = newList + } +} + +// GetInstances returns a copy of the latest known list of addresses +// call RefreshBySrvIfAvailable prior to this in order to get a more up-to-date view +func (sd *ServerDiscovery) GetInstances() (addresses []ServerAddress) { + for _, a := range sd.list { + addresses = append(addresses, a) + } + return addresses +} +func (sd *ServerDiscovery) GetInstancesAsStrings() (addresses []string) { + for _, i := range sd.list { + addresses = append(addresses, string(i)) + } + return addresses +} +func (sd *ServerDiscovery) GetInstancesAsMap() (addresses map[string]ServerAddress) { + addresses = make(map[string]ServerAddress) + for _, i := range sd.list { + addresses[string(i)] = i + } + return addresses +} |
