From efdac0a389c9a668f5d746bd24c0f21f5c7a7803 Mon Sep 17 00:00:00 2001 From: Dmitrii Chudinov Date: Mon, 22 Jan 2024 20:27:16 +0300 Subject: [PATCH] Add new SelectelProvider class to support DNS v2 API. --- README.md | 204 +++++- octodns_selectel/__init__.py | 355 +--------- octodns_selectel/v1/provider.py | 356 ++++++++++ octodns_selectel/v2/dns_client.py | 94 +++ octodns_selectel/v2/exceptions.py | 9 + octodns_selectel/v2/mappings.py | 118 ++++ octodns_selectel/v2/provider.py | 175 +++++ octodns_selectel/version.py | 1 + script/release | 2 +- setup.py | 4 +- tests/__init__.py | 0 .../test_provider_octodns_selectel_v1.py} | 19 +- tests/v2/test_provider_octodns_selectel_v2.py | 667 ++++++++++++++++++ tests/v2/test_selectel_dns_client.py | 278 ++++++++ tests/v2/test_selectel_mappings.py | 567 +++++++++++++++ 15 files changed, 2459 insertions(+), 390 deletions(-) create mode 100644 octodns_selectel/v1/provider.py create mode 100644 octodns_selectel/v2/dns_client.py create mode 100644 octodns_selectel/v2/exceptions.py create mode 100644 octodns_selectel/v2/mappings.py create mode 100644 octodns_selectel/v2/provider.py create mode 100644 octodns_selectel/version.py create mode 100644 tests/__init__.py rename tests/{test_provider_octodns_selectel.py => v1/test_provider_octodns_selectel_v1.py} (96%) create mode 100644 tests/v2/test_provider_octodns_selectel_v2.py create mode 100644 tests/v2/test_selectel_dns_client.py create mode 100644 tests/v2/test_selectel_mappings.py diff --git a/README.md b/README.md index 45e2014..8edaea6 100644 --- a/README.md +++ b/README.md @@ -1,54 +1,198 @@ -## Selectel DNS provider for octoDNS +# Selectel DNS provider for octoDNS -An [octoDNS](https://github.com/octodns/octodns/) provider that targets [Selectel DNS](https://selectel.ru/en/services/additional/dns/). +An [octoDNS](https://github.com/octodns/octodns/) provider that targets [Selectel DNS](https://docs.selectel.com/cloud-services/dns-hosting/dns_hosting/). -### Installation +## Contents -#### Command line +* [Installation](#installation) +* [Capabilities](#capabilities) +* [Configuration](#configuration) +* [Quickstart](#quickstart) +* [Current provider vs. Legacy provider](#current-provider-vs-legacy-provider) +* [Migration from legacy DNS API](#migration-from-legacy-dns-api) +* [Development](#development) -``` -pip install octodns-selectel -``` +## Installation +Install Selectel plugin in your environment and [octodns](https://github.com/octodns/octodns) itself if it is not present. -#### requirements.txt/setup.py +```bash +pip install octodns octodns-selectel +``` -Pinning specific versions or SHAs is recommended to avoid unplanned upgrades. +## Capabilities -##### Versions +| What | Value | +|-------------------|---------------------------------------------------| +| Supported records | A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP, TXT | +| Dynamic records | ❌ | +## Configuration +Add selectel provider to `config.yaml`. +```yaml +providers: + selectel: + class: octodns_selectel.SelectelProvider + token: env/KEYSTONE_PROJECT_TOKEN ``` -# Start with the latest versions and don't just copy what's here -octodns==0.9.17 -octodns-selectel==0.0.3 -``` - -##### SHAs +Set **KEYSTONE_PROJECT_TOKEN** environmental variable or write value directly in config without `env/` prefix. +How to obtain required token you can read [here](https://developers.selectel.com/docs/control-panel/authorization/#project-token) +## Quickstart +To get more details on configuration and capabilities check [octodns repository](https://github.com/octodns/octodns) +#### 1. Organize your configs. +```bash +Project +└── .octodns + ├── config.yaml + └── zones + ├── octodns-test-alias.com.yaml + └── octodns-test.com.yaml ``` -# Start with the latest/specific versions and don't just copy what's here --e git+https://git@github.com/octodns/octodns.git@9da19749e28f68407a1c246dfdf65663cdc1c422#egg=octodns --e git+https://git@github.com/octodns/octodns-selectel.git@ec9661f8b335241ae4746eea467a8509205e6a30#egg=octodns_selectel -``` - -### Configuration - +#### 2. Fill octodns configuration file ```yaml +# .octodns/config.yaml providers: + config: + class: octodns.provider.yaml.YamlProvider + directory: ./octodns/zones + default_ttl: 3600 + enforce_order: True selectel: class: octodns_selectel.SelectelProvider - token: env/SELECTEL_TOKEN + token: env/KEYSTONE_PROJECT_TOKEN + +zones: + octodns-test.com.: + sources: + - config + targets: + - selectel + octodns-test-alias.com.: + sources: + - config + targets: + - selectel +``` +#### 3. Prepare config for each of your zones +```yaml +# .octodns/zones/octodns-test.com.yaml +'': + - ttl: 3600 + type: A + values: + - 1.2.3.4 + - 1.2.3.5 + - ttl: 3600 + type: AAAA + values: + - 6dc1:b9af:74ca:84e9:6c7c:5c0f:c292:9188 + - 5051:e345:9038:052c:00db:eb98:d871:8ae6 + - ttl: 3600 + type: MX + value: + exchange: mail1.octodns-test.com. + preference: 10 + - ttl: 3600 + type: TXT + values: + - "bar" + - "foo" + +_sip._tcp: + - ttl: 3600 + type: SRV + values: + - port: 5060 + priority: 10 + target: phone1.example.com. + weight: 60 + - port: 5030 + priority: 20 + target: phone2.example.com. + weight: 0 + +foo: + - ttl: 3600 + type: CNAME + value: bar.octodns-test.com. + +sshfp: + - ttl: 3600 + type: SSHFP + values: + - algorithm: 1 + fingerprint: "4158f281921260b0205508121c6f5cee879e15f22bdbc319ef2ae9fd308db3be" + fingerprint_type: 2 + - algorithm: 4 + fingerprint: "123456789abcdef67890123456789abcdef67890123456789abcdef123456789" + fingerprint_type: 2 + +txt: + - ttl: 3600 + type: TXT + values: + - "bar_txt" + - "foo_txt" +``` +```yaml +# .octodns/zones/octodns-test-alias.com.yaml +'': + - ttl: 3600 + type: ALIAS + value: octodns-test.com. +``` +#### 4. Check and apply! +```bash +# Run config and check suggested changes +octodns-sync --config-file=.octodns/config.yaml +# Apply changes if everything is ok by adding +octodns-sync --config-file=.octodns/config.yaml --doit ``` -### Support Information +### Current provider vs. Legacy provider +Current provider is `octodns_selectel.SelectelProvider` +Legacy provider is `octodns_selectel.SelectelProviderLegacy` -#### Records +They are not compatible. They utilize different API and created zones live on different authoritative servers. +Zone created in v2 API with current provider is entirely new zone, and not available via v1 api and vice versa. -SelectelProvider supports A, AAAA, ALIAS, CNAME, MX, NS, SRV, SSHFP and TXT +If you are going to create new zone, we strongly recommend to use `SelectelProvider`. +If you have zones in v1, you still can manage them with `SelectelLegacyProvider`. -#### Dynamic +If you updated plugin from unstable (0.x.x) version you should rename provider class in octodns config from `SelectelProvider` to `SelectelLegacyProvider` +to work with legacy api. -SelectelProvider does not support dynamic records. +### Migration from legacy DNS API +If v1 API is still available for you and your zones are hosted there, then you probably would like to move your zones to v2. Legacy API will be eventually shutdown. +With octodns you can sync ALL your v1 zone with v2 by using both providers as in example below. +❗️IMPORTANT❗️ +`SELECTEL_TOKEN` and `KEYSTONE_PROJECT_TOKEN` are **different** tokens! +Above we mentioned how to get keystone token, how to obtain selectel token read [here](https://developers.selectel.com/docs/control-panel/authorization/#selectel-token-api-key) +```yaml +processors: + # Selectel doesn't allow manage Root NS records + # for skipping root ns use IgnoreRootNsFilter class + no-root-ns: + class: octodns.processor.filter.IgnoreRootNsFilter -### Development +providers: + selectel_legacy: + class: octodns_selectel.SelectelProviderLegacy + token: env/SELECTEL_TOKEN + selectel: + class: octodns_selectel.SelectelProvider + token: env/KEYSTONE_PROJECT_TOKEN + +zones: + # Using '*' to sync all zones available on account. + "*": + sources: + - selectel_legacy + processors: + - no-root-ns + targets: + - selectel +``` +## Development See the [/script/](/script/) directory for some tools to help with the development process. They generally follow the [Script to rule them all](https://github.com/github/scripts-to-rule-them-all) pattern. Most useful is `./script/bootstrap` which will create a venv and install both the runtime and development related requirements. It will also hook up a pre-commit hook that covers most of what's run by CI. diff --git a/octodns_selectel/__init__.py b/octodns_selectel/__init__.py index 7a4dcb3..7265c2b 100644 --- a/octodns_selectel/__init__.py +++ b/octodns_selectel/__init__.py @@ -1,353 +1,4 @@ -# -# -# +from .v1.provider import SelectelProvider as SelectelProviderLegacy +from .v2.provider import SelectelProvider as SelectelProvider -from collections import defaultdict -from logging import getLogger - -from requests import Session -from requests.exceptions import HTTPError - -from octodns import __VERSION__ as octodns_version -from octodns.provider import ProviderException -from octodns.provider.base import BaseProvider -from octodns.record import Record, Update - -# TODO: remove __VERSION__ with the next major version release -__version__ = __VERSION__ = '0.0.4' - - -def require_root_domain(fqdn): - if fqdn.endswith('.'): - return fqdn - - return f'{fqdn}.' - - -class SelectelAuthenticationRequired(ProviderException): - def __init__(self, msg): - message = 'Authorization failed. Invalid or empty token.' - super().__init__(message) - - -class SelectelProvider(BaseProvider): - SUPPORTS_GEO = False - - SUPPORTS = set( - ('A', 'AAAA', 'ALIAS', 'CNAME', 'MX', 'NS', 'TXT', 'SRV', 'SSHFP') - ) - - MIN_TTL = 60 - - PAGINATION_LIMIT = 50 - - API_URL = 'https://api.selectel.ru/domains/v1' - - def __init__(self, id, token, *args, **kwargs): - self.log = getLogger(f'SelectelProvider[{id}]') - self.log.debug('__init__: id=%s', id) - super().__init__(id, *args, **kwargs) - - self._sess = Session() - self._sess.headers.update( - { - 'X-Token': token, - 'Content-Type': 'application/json', - 'User-Agent': f'octodns/{octodns_version} octodns-selectel/{__VERSION__}', - } - ) - self._zone_records = {} - self._domain_list = self.domain_list() - self._zones = None - - def _request(self, method, path, params=None, data=None): - self.log.debug('_request: method=%s, path=%s', method, path) - - url = f'{self.API_URL}{path}' - resp = self._sess.request(method, url, params=params, json=data) - - self.log.debug('_request: status=%s', resp.status_code) - if resp.status_code == 401: - raise SelectelAuthenticationRequired(resp.text) - elif resp.status_code == 404: - return {} - resp.raise_for_status() - if method == 'DELETE': - return {} - return resp.json() - - def _get_total_count(self, path): - url = f'{self.API_URL}{path}' - resp = self._sess.request('HEAD', url) - return int(resp.headers['X-Total-Count']) - - def _request_with_pagination(self, path, total_count): - result = [] - for offset in range(0, total_count, self.PAGINATION_LIMIT): - result += self._request( - 'GET', - path, - params={'limit': self.PAGINATION_LIMIT, 'offset': offset}, - ) - return result - - def _include_change(self, change): - if isinstance(change, Update): - existing = change.existing.data - new = change.new.data - new['ttl'] = max(self.MIN_TTL, new['ttl']) - if new == existing: - self.log.debug( - '_include_changes: new=%s, found existing=%s', new, existing - ) - return False - return True - - def _apply(self, plan): - desired = plan.desired - changes = plan.changes - self.log.debug( - '_apply: zone=%s, len(changes)=%d', desired.name, len(changes) - ) - - zone_name = desired.name[:-1] - for change in changes: - class_name = change.__class__.__name__ - getattr(self, f'_apply_{class_name}'.lower())(zone_name, change) - - def _apply_create(self, zone_name, change): - new = change.new - params_for = getattr(self, f'_params_for_{new._type}') - for params in params_for(new): - self.create_record(zone_name, params) - - def _apply_update(self, zone_name, change): - self._apply_delete(zone_name, change) - self._apply_create(zone_name, change) - - def _apply_delete(self, zone_name, change): - existing = change.existing - self.delete_record(zone_name, existing._type, existing.name) - - def _params_for_multiple(self, record): - for value in record.values: - yield { - 'content': value, - 'name': record.fqdn, - 'ttl': max(self.MIN_TTL, record.ttl), - 'type': record._type, - } - - def _params_for_single(self, record): - yield { - 'content': record.value, - 'name': record.fqdn, - 'ttl': max(self.MIN_TTL, record.ttl), - 'type': record._type, - } - - def _params_for_MX(self, record): - for value in record.values: - yield { - 'content': value.exchange, - 'name': record.fqdn, - 'ttl': max(self.MIN_TTL, record.ttl), - 'type': record._type, - 'priority': value.preference, - } - - def _params_for_SRV(self, record): - for value in record.values: - yield { - 'name': record.fqdn, - 'target': value.target, - 'ttl': max(self.MIN_TTL, record.ttl), - 'type': record._type, - 'port': value.port, - 'weight': value.weight, - 'priority': value.priority, - } - - def _params_for_SSHFP(self, record): - for value in record.values: - yield { - 'name': record.fqdn, - 'ttl': max(self.MIN_TTL, record.ttl), - 'type': record._type, - 'algorithm': value.algorithm, - 'fingerprint_type': value.fingerprint_type, - 'fingerprint': value.fingerprint, - } - - _params_for_A = _params_for_multiple - _params_for_AAAA = _params_for_multiple - _params_for_NS = _params_for_multiple - _params_for_TXT = _params_for_multiple - - _params_for_CNAME = _params_for_single - _params_for_ALIAS = _params_for_single - - def _data_for_A(self, _type, records): - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': [r['content'] for r in records], - } - - _data_for_AAAA = _data_for_A - - def _data_for_NS(self, _type, records): - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': [require_root_domain(r["content"]) for r in records], - } - - def _data_for_MX(self, _type, records): - values = [] - for record in records: - values.append( - { - 'preference': record['priority'], - 'exchange': require_root_domain(record["content"]), - } - ) - return {'ttl': records[0]['ttl'], 'type': _type, 'values': values} - - def _data_for_CNAME(self, _type, records): - only = records[0] - return { - 'ttl': only['ttl'], - 'type': _type, - 'value': require_root_domain(only["content"]), - } - - _data_for_ALIAS = _data_for_CNAME - - def _data_for_TXT(self, _type, records): - return { - 'ttl': records[0]['ttl'], - 'type': _type, - 'values': [r['content'] for r in records], - } - - def _data_for_SRV(self, _type, records): - values = [] - for record in records: - values.append( - { - 'priority': record['priority'], - 'weight': record['weight'], - 'port': record['port'], - 'target': require_root_domain(record["target"]), - } - ) - - return {'type': _type, 'ttl': records[0]['ttl'], 'values': values} - - def _data_for_SSHFP(self, _type, records): - values = [] - for record in records: - values.append( - { - 'algorithm': record['algorithm'], - 'fingerprint_type': record['fingerprint_type'], - 'fingerprint': f'{record["fingerprint"]}.', - } - ) - - return {'type': _type, 'ttl': records[0]['ttl'], 'values': values} - - def populate(self, zone, target=False, lenient=False): - self.log.debug( - 'populate: name=%s, target=%s, lenient=%s', - zone.name, - target, - lenient, - ) - before = len(zone.records) - records = self.zone_records(zone) - if records: - values = defaultdict(lambda: defaultdict(list)) - for record in records: - name = zone.hostname_from_fqdn(record['name']) - _type = record['type'] - if _type in self.SUPPORTS: - values[name][record['type']].append(record) - for name, types in values.items(): - for _type, records in types.items(): - data_for = getattr(self, f'_data_for_{_type}') - data = data_for(_type, records) - record = Record.new( - zone, name, data, source=self, lenient=lenient - ) - zone.add_record(record) - self.log.info( - 'populate: found %s records', len(zone.records) - before - ) - - def domain_list(self): - path = '/' - domains = {} - domains_list = [] - - total_count = self._get_total_count(path) - domains_list = self._request_with_pagination(path, total_count) - - for domain in domains_list: - domains[domain['name']] = domain - return domains - - def zone_records(self, zone): - path = f'/{zone.name[:-1]}/records/' - zone_records = [] - - total_count = self._get_total_count(path) - zone_records = self._request_with_pagination(path, total_count) - - self._zone_records[zone.name] = zone_records - return self._zone_records[zone.name] - - def create_domain(self, name, zone=""): - path = '/' - - data = {'name': name, 'bind_zone': zone} - - resp = self._request('POST', path, data=data) - self._domain_list[name] = resp - return resp - - def create_record(self, zone_name, data): - self.log.debug('Create record. Zone: %s, data %s', zone_name, data) - if zone_name in self._domain_list.keys(): - domain_id = self._domain_list[zone_name]['id'] - else: - domain_id = self.create_domain(zone_name)['id'] - - path = f'/{domain_id}/records/' - return self._request('POST', path, data=data) - - def delete_record(self, domain, _type, zone): - self.log.debug('Delete records. Domain: %s, Type: %s', domain, _type) - domain_id = self._domain_list[domain]['id'] - records = self._zone_records.get(f'{domain}.', False) - if not records: - path = f'/{domain_id}/records/' - records = self._request('GET', path) - - full_domain = f'{zone}.{domain}' if zone else domain - delete_count, skip_count = 0, 0 - for record in records: - if record['type'] == _type and record['name'] == full_domain: - record_id = record["id"] - path = f'/{domain_id}/records/{record_id}' - try: - self._request('DELETE', path) - delete_count += 1 - except HTTPError: - skip_count += 1 - self.log.warning(f'Failed to delete record {record_id}') - - self.log.debug( - f'Deleted {delete_count} records. Skipped {skip_count} records' - ) +__all__ = [SelectelProviderLegacy, SelectelProvider] diff --git a/octodns_selectel/v1/provider.py b/octodns_selectel/v1/provider.py new file mode 100644 index 0000000..24fb0f1 --- /dev/null +++ b/octodns_selectel/v1/provider.py @@ -0,0 +1,356 @@ +from collections import defaultdict +from logging import getLogger + +from requests import Session +from requests.exceptions import HTTPError + +from octodns import __VERSION__ as octodns_version +from octodns.provider import ProviderException +from octodns.provider.base import BaseProvider +from octodns.record import Record, Update + +from octodns_selectel.version import version as provider_version + + +def require_root_domain(fqdn): + if fqdn.endswith('.'): + return fqdn + + return f'{fqdn}.' + + +class SelectelAuthenticationRequired(ProviderException): + def __init__(self, msg): + message = 'Authorization failed. Invalid or empty token.' + super().__init__(message) + + +class SelectelProvider(BaseProvider): + SUPPORTS_GEO = False + + SUPPORTS = set( + ('A', 'AAAA', 'ALIAS', 'CNAME', 'MX', 'NS', 'TXT', 'SRV', 'SSHFP') + ) + + MIN_TTL = 60 + + PAGINATION_LIMIT = 50 + + API_URL = 'https://api.selectel.ru/domains/v1' + + def __init__(self, id, token, *args, **kwargs): + self.log = getLogger(f'SelectelProvider[{id}]') + self.log.debug('__init__: id=%s', id) + super().__init__(id, *args, **kwargs) + + self._sess = Session() + self._sess.headers.update( + { + 'X-Token': token, + 'Content-Type': 'application/json', + 'User-Agent': f'octodns/{octodns_version} octodns-selectel/{provider_version}', + } + ) + self._zone_records = {} + self._domain_list = self.domain_list() + self._zones = None + + def _request(self, method, path, params=None, data=None): + self.log.debug('_request: method=%s, path=%s', method, path) + + url = f'{self.API_URL}{path}' + resp = self._sess.request(method, url, params=params, json=data) + + self.log.debug('_request: status=%s', resp.status_code) + if resp.status_code == 401: + raise SelectelAuthenticationRequired(resp.text) + elif resp.status_code == 404: + return {} + resp.raise_for_status() + if method == 'DELETE': + return {} + return resp.json() + + def _get_total_count(self, path): + url = f'{self.API_URL}{path}' + resp = self._sess.request('HEAD', url) + return int(resp.headers['X-Total-Count']) + + def _request_with_pagination(self, path, total_count): + result = [] + for offset in range(0, total_count, self.PAGINATION_LIMIT): + result += self._request( + 'GET', + path, + params={'limit': self.PAGINATION_LIMIT, 'offset': offset}, + ) + return result + + def _include_change(self, change): + if isinstance(change, Update): + existing = change.existing.data + new = change.new.data + new['ttl'] = max(self.MIN_TTL, new['ttl']) + if new == existing: + self.log.debug( + '_include_changes: new=%s, found existing=%s', new, existing + ) + return False + return True + + def _apply(self, plan): + desired = plan.desired + changes = plan.changes + self.log.debug( + '_apply: zone=%s, len(changes)=%d', desired.name, len(changes) + ) + + zone_name = desired.name[:-1] + for change in changes: + class_name = change.__class__.__name__ + getattr(self, f'_apply_{class_name}'.lower())(zone_name, change) + + def _apply_create(self, zone_name, change): + new = change.new + params_for = getattr(self, f'_params_for_{new._type}') + for params in params_for(new): + self.create_record(zone_name, params) + + def _apply_update(self, zone_name, change): + self._apply_delete(zone_name, change) + self._apply_create(zone_name, change) + + def _apply_delete(self, zone_name, change): + existing = change.existing + self.delete_record(zone_name, existing._type, existing.name) + + def list_zones(self): + # This method is called dynamically in octodns.Manager._preprocess_zones() + # and required for use of "*" if provider is source. + zones_without_dot = self.domain_list() + return [ + require_root_domain(zone_name) for zone_name in zones_without_dot + ] + + def _params_for_multiple(self, record): + for value in record.values: + yield { + 'content': value, + 'name': record.fqdn, + 'ttl': max(self.MIN_TTL, record.ttl), + 'type': record._type, + } + + def _params_for_single(self, record): + yield { + 'content': record.value, + 'name': record.fqdn, + 'ttl': max(self.MIN_TTL, record.ttl), + 'type': record._type, + } + + def _params_for_MX(self, record): + for value in record.values: + yield { + 'content': value.exchange, + 'name': record.fqdn, + 'ttl': max(self.MIN_TTL, record.ttl), + 'type': record._type, + 'priority': value.preference, + } + + def _params_for_SRV(self, record): + for value in record.values: + yield { + 'name': record.fqdn, + 'target': value.target, + 'ttl': max(self.MIN_TTL, record.ttl), + 'type': record._type, + 'port': value.port, + 'weight': value.weight, + 'priority': value.priority, + } + + def _params_for_SSHFP(self, record): + for value in record.values: + yield { + 'name': record.fqdn, + 'ttl': max(self.MIN_TTL, record.ttl), + 'type': record._type, + 'algorithm': value.algorithm, + 'fingerprint_type': value.fingerprint_type, + 'fingerprint': value.fingerprint, + } + + _params_for_A = _params_for_multiple + _params_for_AAAA = _params_for_multiple + _params_for_NS = _params_for_multiple + _params_for_TXT = _params_for_multiple + + _params_for_CNAME = _params_for_single + _params_for_ALIAS = _params_for_single + + def _data_for_A(self, _type, records): + return { + 'ttl': records[0]['ttl'], + 'type': _type, + 'values': [r['content'] for r in records], + } + + _data_for_AAAA = _data_for_A + + def _data_for_NS(self, _type, records): + return { + 'ttl': records[0]['ttl'], + 'type': _type, + 'values': [require_root_domain(r["content"]) for r in records], + } + + def _data_for_MX(self, _type, records): + values = [] + for record in records: + values.append( + { + 'preference': record['priority'], + 'exchange': require_root_domain(record["content"]), + } + ) + return {'ttl': records[0]['ttl'], 'type': _type, 'values': values} + + def _data_for_CNAME(self, _type, records): + only = records[0] + return { + 'ttl': only['ttl'], + 'type': _type, + 'value': require_root_domain(only["content"]), + } + + _data_for_ALIAS = _data_for_CNAME + + def _data_for_TXT(self, _type, records): + return { + 'ttl': records[0]['ttl'], + 'type': _type, + 'values': [r['content'] for r in records], + } + + def _data_for_SRV(self, _type, records): + values = [] + for record in records: + values.append( + { + 'priority': record['priority'], + 'weight': record['weight'], + 'port': record['port'], + 'target': require_root_domain(record["target"]), + } + ) + + return {'type': _type, 'ttl': records[0]['ttl'], 'values': values} + + def _data_for_SSHFP(self, _type, records): + values = [] + for record in records: + values.append( + { + 'algorithm': record['algorithm'], + 'fingerprint_type': record['fingerprint_type'], + 'fingerprint': f'{record["fingerprint"]}', + } + ) + + return {'type': _type, 'ttl': records[0]['ttl'], 'values': values} + + def populate(self, zone, target=False, lenient=False): + self.log.debug( + 'populate: name=%s, target=%s, lenient=%s', + zone.name, + target, + lenient, + ) + before = len(zone.records) + records = self.zone_records(zone) + if records: + values = defaultdict(lambda: defaultdict(list)) + for record in records: + name = zone.hostname_from_fqdn(record['name']) + _type = record['type'] + if _type in self.SUPPORTS: + values[name][record['type']].append(record) + for name, types in values.items(): + for _type, records in types.items(): + data_for = getattr(self, f'_data_for_{_type}') + data = data_for(_type, records) + record = Record.new( + zone, name, data, source=self, lenient=lenient + ) + zone.add_record(record) + self.log.info( + 'populate: found %s records', len(zone.records) - before + ) + + def domain_list(self): + path = '/' + domains = {} + domains_list = [] + + total_count = self._get_total_count(path) + domains_list = self._request_with_pagination(path, total_count) + + for domain in domains_list: + domains[domain['name']] = domain + return domains + + def zone_records(self, zone): + path = f'/{zone.name[:-1]}/records/' + zone_records = [] + + total_count = self._get_total_count(path) + zone_records = self._request_with_pagination(path, total_count) + + self._zone_records[zone.name] = zone_records + return self._zone_records[zone.name] + + def create_domain(self, name, zone=""): + path = '/' + + data = {'name': name, 'bind_zone': zone} + + resp = self._request('POST', path, data=data) + self._domain_list[name] = resp + return resp + + def create_record(self, zone_name, data): + self.log.debug('Create record. Zone: %s, data %s', zone_name, data) + if zone_name in self._domain_list.keys(): + domain_id = self._domain_list[zone_name]['id'] + else: + domain_id = self.create_domain(zone_name)['id'] + + path = f'/{domain_id}/records/' + return self._request('POST', path, data=data) + + def delete_record(self, domain, _type, zone): + self.log.debug('Delete records. Domain: %s, Type: %s', domain, _type) + domain_id = self._domain_list[domain]['id'] + records = self._zone_records.get(f'{domain}.', False) + if not records: + path = f'/{domain_id}/records/' + records = self._request('GET', path) + + full_domain = f'{zone}.{domain}' if zone else domain + delete_count, skip_count = 0, 0 + for record in records: + if record['type'] == _type and record['name'] == full_domain: + record_id = record["id"] + path = f'/{domain_id}/records/{record_id}' + try: + self._request('DELETE', path) + delete_count += 1 + except HTTPError: + skip_count += 1 + self.log.warning(f'Failed to delete record {record_id}') + + self.log.debug( + f'Deleted {delete_count} records. Skipped {skip_count} records' + ) diff --git a/octodns_selectel/v2/dns_client.py b/octodns_selectel/v2/dns_client.py new file mode 100644 index 0000000..ef79b22 --- /dev/null +++ b/octodns_selectel/v2/dns_client.py @@ -0,0 +1,94 @@ +from requests import Session + +from octodns import __VERSION__ as octodns_version + +from .exceptions import ApiException + + +class DNSClient: + API_URL = 'https://api.selectel.ru/domains/v2' + _PAGINATION_LIMIT = 50 + + _zone_path = "/zones" + __rrsets_path = "/zones/{}/rrset" + __rrsets_path_specific = "/zones/{}/rrset/{}" + + def __init__(self, library_version: str, openstack_token: str): + self._sess = Session() + self._sess.headers.update( + { + 'X-Auth-Token': openstack_token, + 'Content-Type': 'application/json', + 'User-Agent': f'octodns/{octodns_version} octodns-selectel/{library_version}', + } + ) + + @classmethod + def _rrset_path(cls, zone_uuid): + return cls.__rrsets_path.format(zone_uuid) + + @classmethod + def _rrset_path_specific(cls, zone_uuid, rrset_uuid): + return cls.__rrsets_path_specific.format(zone_uuid, rrset_uuid) + + def _request(self, method, path, params=None, data=None): + url = f'{self.API_URL}{path}' + resp = self._sess.request(method, url, params, json=data) + try: + resp_json = resp.json() + except ValueError: + resp_json = {} + match resp.status_code: + case 200 | 201 | 204: + return resp_json + case 400 | 422: + raise ApiException( + f'Bad request. Description: {resp_json.get("description", "Invalid payload")}.' + ) + case 401: + raise ApiException( + 'Authorization failed. Invalid or empty token.' + ) + case 404: + raise ApiException( + 'Resource not found: ' + f'{resp_json.get("error", "invalid path")}.' + ) + case 409: + raise ApiException( + f'Conflict: {resp_json.get("error", "resource maybe already created")}.' + ) + case _: + raise ApiException('Internal server error.') + + def _request_all_entities(self, path, offset=0) -> list[int]: + items = [] + resp = self._request( + "GET", path, dict(limit=self._PAGINATION_LIMIT, offset=offset) + ) + items.extend(resp["result"]) + if next_offset := resp["next_offset"]: + items.extend(self._request_all_entities(path, offset=next_offset)) + return items + + def list_zones(self): + return self._request_all_entities(self._zone_path) + + def create_zone(self, name): + return self._request('POST', self._zone_path, data=dict(name=name)) + + def list_rrsets(self, zone_uuid): + path = self._rrset_path(zone_uuid) + return self._request_all_entities(path) + + def create_rrset(self, zone_uuid, data): + path = self._rrset_path(zone_uuid) + return self._request('POST', path, data=data) + + def update_rrset(self, zone_uuid, rrset_uuid, data): + path = self._rrset_path_specific(zone_uuid, rrset_uuid) + return self._request('PATCH', path, data=data) + + def delete_rrset(self, zone_uuid, rrset_uuid): + path = self._rrset_path_specific(zone_uuid, rrset_uuid) + return self._request('DELETE', path) diff --git a/octodns_selectel/v2/exceptions.py b/octodns_selectel/v2/exceptions.py new file mode 100644 index 0000000..9bc9e2d --- /dev/null +++ b/octodns_selectel/v2/exceptions.py @@ -0,0 +1,9 @@ +from octodns.provider import ProviderException + + +class SelectelException(ProviderException): + pass + + +class ApiException(SelectelException): + pass diff --git a/octodns_selectel/v2/mappings.py b/octodns_selectel/v2/mappings.py new file mode 100644 index 0000000..b307e98 --- /dev/null +++ b/octodns_selectel/v2/mappings.py @@ -0,0 +1,118 @@ +from string import Template + +from .exceptions import SelectelException + + +def to_selectel_rrset(record): + rrset = dict(name=record.fqdn, ttl=record.ttl, type=record._type) + rrset_records = [] + content_mx_tmpl = Template("$preference $exchange") + content_srv_tmpl = Template("$priority $weight $port $target") + content_sshfp_tmpl = Template("$algorithm $fingerprint_type $fingerprint") + match record._type: + case "A" | "AAAA" | "NS": + rrset_records = list( + map(lambda value: {'content': value}, record.values) + ) + case "CNAME" | "ALIAS": + rrset_records = [{'content': record.value}] + case "TXT": + rrset_records = [ + dict(content=f'\"{value}\"') for value in record.values + ] + case "MX": + rrset_records = list( + map( + lambda value: { + 'content': content_mx_tmpl.substitute( + preference=value.preference, exchange=value.exchange + ) + }, + record.values, + ) + ) + case "SRV": + rrset_records = list( + map( + lambda value: { + 'content': content_srv_tmpl.substitute( + priority=value.priority, + weight=value.weight, + port=value.port, + target=value.target, + ) + }, + record.values, + ) + ) + case "SSHFP": + rrset_records = list( + map( + lambda value: { + 'content': content_sshfp_tmpl.substitute( + algorithm=value.algorithm, + fingerprint_type=value.fingerprint_type, + fingerprint=value.fingerprint, + ) + }, + record.values, + ) + ) + case _: + raise SelectelException( + f'DNS Record with type: {record._type} not supported' + ) + rrset["records"] = rrset_records + return rrset + + +def to_octodns_record_data(rrset): + rrset_type = rrset["type"] + octodns_record = dict(type=rrset_type, ttl=rrset["ttl"]) + record_values = [] + key_for_record_values = "values" + match rrset_type: + case "A" | "AAAA" | "NS": + record_values = [r['content'] for r in rrset["records"]] + case "CNAME" | "ALIAS": + key_for_record_values = "value" + record_values = rrset["records"][0]["content"] + case "TXT": + record_values = [ + r['content'].strip('"\'') for r in rrset["records"] + ] + case "MX": + for record in rrset["records"]: + preference, exchange = record["content"].split(" ") + record_values.append( + {'preference': preference, 'exchange': exchange} + ) + case "SRV": + for record in rrset["records"]: + priority, weight, port, target = record["content"].split(" ") + record_values.append( + { + 'priority': priority, + 'weight': weight, + 'port': port, + 'target': target, + } + ) + case "SSHFP": + for record in rrset["records"]: + algorithm, fingerprint_type, fingerprint = record[ + "content" + ].split(" ") + record_values.append( + { + 'algorithm': algorithm, + 'fingerprint_type': fingerprint_type, + 'fingerprint': fingerprint, + } + ) + case _: + raise SelectelException( + f'DNS Record with type: {rrset_type} not supported' + ) + octodns_record[key_for_record_values] = record_values + return octodns_record diff --git a/octodns_selectel/v2/provider.py b/octodns_selectel/v2/provider.py new file mode 100644 index 0000000..31588cf --- /dev/null +++ b/octodns_selectel/v2/provider.py @@ -0,0 +1,175 @@ +# +# +# + +from logging import getLogger + +from octodns.provider.base import BaseProvider +from octodns.record import Record, SshfpRecord, Update + +from octodns_selectel.version import version as provider_version + +from .dns_client import DNSClient +from .exceptions import ApiException +from .mappings import to_octodns_record_data, to_selectel_rrset + + +class SelectelProvider(BaseProvider): + SUPPORTS_GEO = False + SUPPORTS = set( + ('A', 'AAAA', 'ALIAS', 'CNAME', 'MX', 'NS', 'TXT', 'SRV', 'SSHFP') + ) + MIN_TTL = 60 + + def __init__(self, id, token, *args, **kwargs): + self.log = getLogger(f'SelectelProvider[{id}]') + self.log.debug('__init__: id=%s', id) + super().__init__(id, *args, **kwargs) + self._client = DNSClient(provider_version, token) + self._zones = self.group_existing_zones_by_name() + self._zone_rrsets = {} + + def _include_change(self, change): + if isinstance(change, Update): + existing = change.existing.data + new = change.new.data + new['ttl'] = max(self.MIN_TTL, new['ttl']) + if isinstance(change.new, SshfpRecord): + for i in range(0, len(change.new.rr_values)): + change.new.rr_values[i].fingerprint = change.new.rr_values[ + i + ].fingerprint.lower() + if new == existing: + self.log.debug( + '_include_changes: new=%s, found existing=%s', new, existing + ) + return False + return True + + def _apply(self, plan): + desired = plan.desired + changes = plan.changes + self.log.debug( + '_apply: zone=%s, len(changes)=%d', desired.name, len(changes) + ) + zone_name = desired.name + if not self._is_zone_already_created(zone_name): + self.create_zone(zone_name) + zone_id = self._get_zone_id_by_name(zone_name) + for change in changes: + action = change.__class__.__name__.lower() + if action == 'create': + self._apply_create(zone_id, change) + if action == 'update': + self._apply_update(zone_id, change) + if action == 'delete': + self._apply_delete(zone_id, change) + + def _is_zone_already_created(self, zone_name): + return zone_name in self._zones.keys() + + def _get_rrset_id(self, zone_name, rrset_type, rrset_name): + return next( + filter( + lambda rrset: rrset["type"] == rrset_type + and rrset["name"] == rrset_name, + self._zone_rrsets[zone_name], + ) + )["uuid"] + + def _apply_create(self, zone_id, change): + new_record = change.new + rrset = to_selectel_rrset(new_record) + self.create_rrset(zone_id, rrset) + + def _apply_update(self, zone_id, change): + existing = change.existing + rrset_id = self._get_rrset_id( + existing.zone.name, existing._type, existing.fqdn + ) + data_for_update = to_selectel_rrset(change.new) + self.update_rrset(zone_id, rrset_id, data_for_update) + + def _apply_delete(self, zone_id, change): + existing = change.existing + rrset_id = self._get_rrset_id( + existing.zone.name, existing._type, existing.fqdn + ) + self.delete_rrset(zone_id, rrset_id) + + def populate(self, zone, target=False, lenient=False): + self.log.debug( + 'populate: name=%s, target=%s, lenient=%s', + zone.name, + target, + lenient, + ) + before = len(zone.records) + rrsets = [] + if self._is_zone_already_created(zone.name): + rrsets = self.list_rrsets(zone) + for rrset in rrsets: + rrset_type = rrset['type'] + if rrset_type in self.SUPPORTS: + record_data = to_octodns_record_data(rrset) + rrset_hostname = zone.hostname_from_fqdn(rrset['name']) + record = Record.new( + zone, + rrset_hostname, + record_data, + source=self, + lenient=lenient, + ) + zone.add_record(record) + self.log.info('populate: found %s records', len(zone.records) - before) + + def _get_zone_id_by_name(self, zone_name): + return self._zones.get(zone_name, False)["uuid"] + + def create_zone(self, name): + self.log.debug('Create zone: %s', name) + zone = self._client.create_zone(name) + self._zones[zone["name"]] = zone + return zone + + def list_zones(self): + # This method is called dynamically in octodns.Manager._preprocess_zones() + # and required for use of "*" if provider is source. + return [zone_name for zone_name in self._zones] + + def group_existing_zones_by_name(self): + self.log.debug('View zones') + return {zone['name']: zone for zone in self._client.list_zones()} + + def list_rrsets(self, zone): + self.log.debug('View rrsets. Zone: %s', zone.name) + zone_id = self._get_zone_id_by_name(zone.name) + zone_rrsets = self._client.list_rrsets(zone_id) + self._zone_rrsets[zone.name] = zone_rrsets + return zone_rrsets + + def create_rrset(self, zone_id, data): + self.log.debug('Create rrset. Zone id: %s, data %s', zone_id, data) + return self._client.create_rrset(zone_id, data) + + def update_rrset(self, zone_id, rrset_id, data): + self.log.debug( + f'Update rrsets. Zone id: {zone_id}, rrset id: {rrset_id}' + ) + try: + self._client.update_rrset(zone_id, rrset_id, data) + except ApiException as api_exception: + self.log.warning( + f'Failed to update rrset {rrset_id}. {api_exception}' + ) + + def delete_rrset(self, zone_id, rrset_id): + self.log.debug( + f'Delete rrsets. Zone id: {zone_id}, rrset id: {rrset_id}' + ) + try: + self._client.delete_rrset(zone_id, rrset_id) + except ApiException as api_exception: + self.log.warning( + f'Failed to delete rrset {rrset_id}. {api_exception}' + ) diff --git a/octodns_selectel/version.py b/octodns_selectel/version.py new file mode 100644 index 0000000..2d430d1 --- /dev/null +++ b/octodns_selectel/version.py @@ -0,0 +1 @@ +version = '0.0.4' diff --git a/script/release b/script/release index a7926a4..6202a66 100755 --- a/script/release +++ b/script/release @@ -33,7 +33,7 @@ fi # Set so that setup.py will create a public release style version number export OCTODNS_RELEASE=1 -VERSION="$(grep "^__version__" "$ROOT/octodns_selectel/__init__.py" | sed -e "s/.* = '//" -e "s/'$//")" +VERSION="$(grep "^version" "$ROOT/octodns_selectel/version.py" | sed -e "s/.* = '//" -e "s/'$//")" git tag -s "v$VERSION" -m "Release $VERSION" git push origin "v$VERSION" diff --git a/setup.py b/setup.py index 507ad1a..635a9a4 100755 --- a/setup.py +++ b/setup.py @@ -11,9 +11,9 @@ def descriptions(): def version(): - with open('octodns_selectel/__init__.py') as fh: + with open('octodns_selectel/version.py') as fh: for line in fh: - if line.startswith('__version__'): + if line.startswith('version'): return line.split("'")[1] return 'unknown' diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_provider_octodns_selectel.py b/tests/v1/test_provider_octodns_selectel_v1.py similarity index 96% rename from tests/test_provider_octodns_selectel.py rename to tests/v1/test_provider_octodns_selectel_v1.py index 27ae526..5109802 100644 --- a/tests/test_provider_octodns_selectel.py +++ b/tests/v1/test_provider_octodns_selectel_v1.py @@ -1,7 +1,3 @@ -# -# -# - from unittest import TestCase import requests_mock @@ -10,7 +6,7 @@ from octodns.record import Record, Update from octodns.zone import Zone -from octodns_selectel import SelectelProvider +from octodns_selectel.v1.provider import SelectelProvider class TestSelectelProvider(TestCase): @@ -398,6 +394,19 @@ def test_domain_list(self, fake_http): result = provider.domain_list() self.assertEqual(result, expected) + @requests_mock.Mocker() + def test_list_zones(self, fake_http): + fake_http.get(f'{self.API_URL}/', json=self.domain) + fake_http.head( + f'{self.API_URL}/', headers={'X-Total-Count': str(len(self.domain))} + ) + + expected = ['unit.tests.'] + provider = SelectelProvider(123, 'test_token') + + result = provider.list_zones() + self.assertEqual(result, expected) + @requests_mock.Mocker() def test_authentication_fail(self, fake_http): fake_http.get(f'{self.API_URL}/', status_code=401) diff --git a/tests/v2/test_provider_octodns_selectel_v2.py b/tests/v2/test_provider_octodns_selectel_v2.py new file mode 100644 index 0000000..b1f6ca6 --- /dev/null +++ b/tests/v2/test_provider_octodns_selectel_v2.py @@ -0,0 +1,667 @@ +import uuid +from unittest import TestCase + +import requests_mock + +from octodns.record import Record, Update +from octodns.zone import Zone + +from octodns_selectel.v2.dns_client import DNSClient +from octodns_selectel.v2.mappings import to_octodns_record_data +from octodns_selectel.v2.provider import SelectelProvider + + +class TestSelectelProvider(TestCase): + _zone_uuid = str(uuid.uuid4()) + _zone_name = 'unit.tests.' + _ttl = 3600 + rrsets = [] + octodns_zone = Zone(_zone_name, []) + expected_records = set() + selectel_zones = [dict(uuid=_zone_uuid, name=_zone_name)] + _version = '0.0.1' + _openstack_token = 'some-openstack-token' + + def _a_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='A', + ttl=self._ttl, + records=[dict(content='1.2.3.4'), dict(content='5.6.7.8')], + ) + + def _aaaa_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='AAAA', + ttl=self._ttl, + records=[ + dict(content="4ad4:a6c4:f856:18be:5a5f:7f16:cc3a:fab9"), + dict(content="da78:f69b:8e5a:6221:d0c9:64b8:c6c0:2eab"), + ], + ) + + def _cname_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='CNAME', + ttl=self._ttl, + records=[dict(content=self._zone_name)], + ) + + def _mx_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='MX', + ttl=self._ttl, + records=[dict(content=f'10 mx.{self._zone_name}')], + ) + + def _ns_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='NS', + ttl=self._ttl, + records=[ + dict(content=f'ns1.{self._zone_name}'), + dict(content=f'ns2.{self._zone_name}'), + dict(content=f'ns3.{self._zone_name}'), + ], + ) + + def _srv_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='SRV', + ttl=self._ttl, + records=[ + dict(content=f'40 50 5050 foo-1.{self._zone_name}'), + dict(content=f'50 60 6060 foo-2.{self._zone_name}'), + ], + ) + + def _txt_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='TXT', + ttl=self._ttl, + records=[dict(content='"Foo1"'), dict(content='"Foo2"')], + ) + + def _sshfp_rrset(self, uuid, hostname): + return dict( + uuid=uuid, + name=f'{hostname}.{self._zone_name}' + if hostname + else self._zone_name, + type='SSHFP', + ttl=self._ttl, + records=[dict(content='1 1 123456789abcdef')], + ) + + def setUp(self): + # A, subdomain='' + a_uuid = str(uuid.uuid4()) + self.rrsets.append(self._a_rrset(a_uuid, '')) + self.expected_records.add( + Record.new( + self.octodns_zone, + '', + data=to_octodns_record_data(self._a_rrset(a_uuid, '')), + ) + ) + # A, subdomain='sub' + a_sub_uuid = str(uuid.uuid4()) + self.rrsets.append(self._a_rrset(a_sub_uuid, 'sub')) + self.expected_records.add( + Record.new( + self.octodns_zone, + 'sub', + data=to_octodns_record_data(self._a_rrset(a_sub_uuid, 'sub')), + ) + ) + + # CNAME, subdomain='www2' + cname_uuid = str(uuid.uuid4()) + self.rrsets.append(self._cname_rrset(cname_uuid, 'www2')) + self.expected_records.add( + Record.new( + self.octodns_zone, + 'www2', + data=to_octodns_record_data( + self._cname_rrset(cname_uuid, 'www2') + ), + ) + ) + # CNAME, subdomain='wwwdot' + cname_sub_uuid = str(uuid.uuid4()) + self.rrsets.append(self._cname_rrset(cname_sub_uuid, 'wwwdot')) + self.expected_records.add( + Record.new( + self.octodns_zone, + 'wwwdot', + data=to_octodns_record_data( + self._cname_rrset(cname_sub_uuid, 'wwwdot') + ), + ) + ) + # MX, subdomain='' + mx_uuid = str(uuid.uuid4()) + self.rrsets.append(self._mx_rrset(mx_uuid, '')) + self.expected_records.add( + Record.new( + self.octodns_zone, + '', + data=to_octodns_record_data(self._mx_rrset(mx_uuid, '')), + ) + ) + # NS, subdomain='www3' + ns_sub_uuid = str(uuid.uuid4()) + self.rrsets.append(self._ns_rrset(ns_sub_uuid, 'www3')) + self.expected_records.add( + Record.new( + self.octodns_zone, + 'www3', + data=to_octodns_record_data( + self._ns_rrset(ns_sub_uuid, 'www3') + ), + ) + ) + # AAAA, subdomain='' + aaaa_uuid = str(uuid.uuid4()) + self.rrsets.append(self._aaaa_rrset(aaaa_uuid, '')) + self.expected_records.add( + Record.new( + self.octodns_zone, + '', + data=to_octodns_record_data(self._aaaa_rrset(aaaa_uuid, '')), + ) + ) + # SRV, subdomain='_srv._tcp' + srv_uuid = str(uuid.uuid4()) + self.rrsets.append(self._srv_rrset(srv_uuid, '_srv._tcp')) + self.expected_records.add( + Record.new( + self.octodns_zone, + '_srv._tcp', + data=to_octodns_record_data( + self._srv_rrset(srv_uuid, '_srv._tcp') + ), + ) + ) + # TXT, subdomain='txt' + txt_uuid = str(uuid.uuid4()) + self.rrsets.append(self._txt_rrset(txt_uuid, 'txt')) + self.expected_records.add( + Record.new( + self.octodns_zone, + 'txt', + data=to_octodns_record_data(self._txt_rrset(srv_uuid, 'txt')), + ) + ) + # SSHFP, subdomain='sshfp' + sshfp_uuid = str(uuid.uuid4()) + self.rrsets.append(self._sshfp_rrset(sshfp_uuid, 'sshfp')) + self.expected_records.add( + Record.new( + self.octodns_zone, + 'sshfp', + data=to_octodns_record_data( + self._sshfp_rrset(sshfp_uuid, 'sshfp') + ), + ) + ) + + def tearDown(self): + self.rrsets.clear() + self.expected_records.clear() + self.octodns_zone = Zone(self._zone_name, []) + + @requests_mock.Mocker() + def test_populate(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict( + result=self.rrsets, limit=len(self.rrsets), next_offset=0 + ), + ) + zone = Zone(self._zone_name, []) + + provider = SelectelProvider(self._version, self._openstack_token) + provider.populate(zone) + + self.assertEqual(len(self.rrsets), len(zone.records)) + self.assertEqual(self.expected_records, zone.records) + + @requests_mock.Mocker() + def test_apply(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict(result=list(), limit=0, next_offset=0), + ) + fake_http.post( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/rrset', json=dict() + ) + + provider = SelectelProvider( + self._version, self._openstack_token, strict_supports=False + ) + + zone = Zone(self._zone_name, []) + for record in self.expected_records: + zone.add_record(record) + + plan = provider.plan(zone) + self.assertEqual(len(self.expected_records), len(plan.changes)) + self.assertEqual(len(self.expected_records), provider.apply(plan)) + + @requests_mock.Mocker() + def test_apply_with_create_zone(self, fake_http): + zone_name_for_created = 'octodns-zone.test.' + zone_uuid = "bdd902e7-7270-44c8-8d18-120fa5e1e5d4" + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict(result=list(), limit=0, next_offset=0), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict(result=list(), limit=0, next_offset=0), + ) + fake_http.post( + f'{DNSClient.API_URL}/zones', + json=dict(uuid=zone_uuid, name=zone_name_for_created), + ) + fake_http.post(f'{DNSClient.API_URL}/zones/{zone_uuid}/rrset') + zone = Zone(zone_name_for_created, []) + provider = SelectelProvider( + self._version, self._openstack_token, strict_supports=False + ) + provider.populate(zone) + + zone.add_record( + Record.new( + zone, '', data=dict(ttl=self._ttl, type="A", values=["1.2.3.4"]) + ) + ) + + plan = provider.plan(zone) + apply_len = provider.apply(plan) + self.assertEqual(1, apply_len) + + @requests_mock.Mocker() + def test_populate_with_not_supporting_type(self, fake_http): + rrsets_with_not_supporting_type = self.rrsets + rrsets_with_not_supporting_type.append( + dict( + name=self._zone_name, + ttl=self._ttl, + type="SOA", + records=[ + dict( + content="a.ns.selectel.ru. support.selectel.ru. " + "2023122202 10800 3600 604800 60" + ) + ], + ) + ) + + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict( + result=rrsets_with_not_supporting_type, + limit=len(self.rrsets), + next_offset=0, + ), + ) + + zone = Zone(self._zone_name, []) + provider = SelectelProvider(self._version, self._openstack_token) + provider.populate(zone) + + self.assertNotEqual( + len(rrsets_with_not_supporting_type), len(zone.records) + ) + self.assertNotEqual(rrsets_with_not_supporting_type, zone.records) + + @requests_mock.Mocker() + def test_apply_update_ttl(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + + updated_rrset = self.rrsets[0] + updated_record = Record.new( + zone=self.octodns_zone, + name=self.octodns_zone.hostname_from_fqdn(updated_rrset["name"]), + data=to_octodns_record_data(updated_rrset), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict( + result=[self._a_rrset(updated_rrset["uuid"], '')], + limit=len(self.rrsets), + next_offset=0, + ), + ) + + updated_rrset["ttl"] *= 2 + fake_http.patch( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/rrset/{updated_rrset["uuid"]}', + status_code=204, + ) + + zone = Zone(self._zone_name, []) + provider = SelectelProvider(self._version, self._openstack_token) + provider.populate(zone) + + zone.remove_record(updated_record) + updated_record.ttl *= 2 + zone.add_record(updated_record) + + plan = provider.plan(zone) + apply_len = provider.apply(plan) + + self.assertEqual(1, apply_len) + + @requests_mock.Mocker() + def test_apply_update_ttl_internal_error(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + + updated_rrset = self.rrsets[0] + updated_record = Record.new( + zone=self.octodns_zone, + name=self.octodns_zone.hostname_from_fqdn(updated_rrset["name"]), + data=to_octodns_record_data(updated_rrset), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict( + result=[self._a_rrset(updated_rrset["uuid"], '')], + limit=len(self.rrsets), + next_offset=0, + ), + ) + + updated_rrset["ttl"] *= 2 + fake_http.patch( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/rrset/{updated_rrset["uuid"]}', + status_code=500, + ) + + zone = Zone(self._zone_name, []) + provider = SelectelProvider(self._version, self._openstack_token) + provider.populate(zone) + + zone.remove_record(updated_record) + updated_record.ttl *= 2 + zone.add_record(updated_record) + + plan = provider.plan(zone) + + with self.assertLogs(provider.log, "WARNING"): + apply_len = provider.apply(plan) + self.assertEqual(1, apply_len) + + @requests_mock.Mocker() + def test_apply_delete(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + deleted_rrset = self.rrsets[0] + deleted_record = Record.new( + zone=self.octodns_zone, + name=self.octodns_zone.hostname_from_fqdn(deleted_rrset["name"]), + data=to_octodns_record_data(deleted_rrset), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict( + result=[self._a_rrset(deleted_rrset["uuid"], '')], + limit=len(self.rrsets), + next_offset=0, + ), + ) + + fake_http.delete( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/rrset/{deleted_rrset["uuid"]}' + ) + + zone = Zone(self._zone_name, []) + provider = SelectelProvider(self._version, self._openstack_token) + provider.populate(zone) + + zone.remove_record(deleted_record) + + plan = provider.plan(zone) + apply_len = provider.apply(plan) + + self.assertEqual(1, apply_len) + + @requests_mock.Mocker() + def test_apply_delete_with_error(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/' + f'rrset?limit={DNSClient._PAGINATION_LIMIT}&offset=0', + json=dict( + result=self.rrsets, limit=len(self.rrsets), next_offset=0 + ), + ) + deleted_rrset = self.rrsets[0] + deleted_record = Record.new( + zone=self.octodns_zone, + name=self.octodns_zone.hostname_from_fqdn(deleted_rrset["name"]), + data=to_octodns_record_data(deleted_rrset), + ) + + fake_http.delete( + f'{DNSClient.API_URL}/zones/{self._zone_uuid}/rrset/{deleted_rrset["uuid"]}', + status_code=500, + ) + + zone = Zone(self._zone_name, []) + provider = SelectelProvider(self._version, self._openstack_token) + provider.populate(zone) + zone.remove_record(deleted_record) + + plan = provider.plan(zone) + + with self.assertLogs(provider.log, "WARNING"): + apply_len = provider.apply(plan) + self.assertEqual(1, apply_len) + + @requests_mock.Mocker() + def test_include_change_returns_false(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + + provider = SelectelProvider(self._version, self._openstack_token) + zone = Zone(self._zone_name, []) + + exist_record = Record.new( + zone, '', dict(ttl=60, type="A", values=["1.2.3.4"]) + ) + change = Update(exist_record, exist_record) + include_change = provider._include_change(change) + + self.assertFalse(include_change) + + @requests_mock.Mocker() + def test_include_change_sshfp_returns_false(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + + provider = SelectelProvider(self._version, self._openstack_token) + zone = Zone(self._zone_name, []) + fingerprint1 = '123456789abcdef' + fingerprint2 = 'abcdef123456789' + exist_record = Record.new( + zone, + '', + dict( + ttl=60, + type="SSHFP", + values=[ + dict( + algorithm=1, + fingerprint_type=1, + fingerprint=fingerprint1, + ), + dict( + algorithm=1, + fingerprint_type=1, + fingerprint=fingerprint2, + ), + ], + ), + ) + new_record = Record.new( + zone, + '', + dict( + ttl=60, + type="SSHFP", + values=[ + dict( + algorithm=1, + fingerprint_type=1, + fingerprint=fingerprint1.upper(), + ), + dict( + algorithm=1, + fingerprint_type=1, + fingerprint=fingerprint2.upper(), + ), + ], + ), + ) + change = Update(exist_record, new_record) + include_change = provider._include_change(change) + + self.assertFalse(include_change) + + @requests_mock.Mocker() + def test_include_change_returns_true(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + + provider = SelectelProvider(self._version, self._openstack_token) + zone = Zone(self._zone_name, []) + + exist_record = Record.new( + zone, '', dict(ttl=60, type="A", values=["1.2.3.4"]) + ) + new = Record.new(zone, '', dict(ttl=70, type="A", values=["1.2.3.4"])) + change = Update(exist_record, new) + include_change = provider._include_change(change) + + self.assertTrue(include_change) + + @requests_mock.Mocker() + def test_list_zones(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + json=dict( + result=self.selectel_zones, + limit=len(self.selectel_zones), + next_offset=0, + ), + ) + provider = SelectelProvider(self._version, self._openstack_token) + zones = provider.list_zones() + + self.assertListEqual(zones, self._zone_name.split()) diff --git a/tests/v2/test_selectel_dns_client.py b/tests/v2/test_selectel_dns_client.py new file mode 100644 index 0000000..a8435d1 --- /dev/null +++ b/tests/v2/test_selectel_dns_client.py @@ -0,0 +1,278 @@ +from unittest import TestCase + +import requests_mock + +from octodns_selectel.v2.dns_client import DNSClient +from octodns_selectel.v2.exceptions import ApiException + + +class TestSelectelDNSClient(TestCase): + zone_name = "test-octodns.ru." + zone_uuid = "01073035-cc25-4956-b0c9-b3a270091c37" + rrset_uuid = "03073035-dd25-4956-b0c9-k91270091d95" + project_id = "763219cb96c141978e8d45da637ae75c" + library_version = "0.0.1" + openstack_token = "some-openstack-token" + dns_client = DNSClient(library_version, openstack_token) + _PAGINATION_LIMIT = 50 + _PAGINATION_OFFSET = 0 + _rrsets = [ + dict( + uuid="0eb2f04e-74fd-4264-a4b8-396e5fc95f00", + name=zone_name, + ttl=3600, + type="SOA", + records=[ + dict( + content="a.ns.selectel.ru. support.selectel.ru. 2023122202 10800 " + "3600 604800 60", + disabled=False, + ) + ], + zone=zone_uuid, + ), + dict( + uuid="0eb2f04e-74fd-4264-a4b8-396e5fc95f00", + name=zone_name, + ttl=3600, + type="NS", + records=[ + dict(content="a.ns.selectel.ru.", disabled=False), + dict(content="b.ns.selectel.ru.", disabled=False), + dict(content="c.ns.selectel.ru.", disabled=False), + dict(content="d.ns.selectel.ru.", disabled=False), + ], + zone=zone_uuid, + ), + ] + _response_list_rrset_without_offset = dict( + count=2, next_offset=0, result=_rrsets + ) + _response_list_rrset_with_offset = dict( + count=2, next_offset=2, result=_rrsets + ) + + @requests_mock.Mocker() + def test_request_unauthorized_with_html_body(self, fake_http): + response_unauthorized_html = """ + + 401 Authorization Required + +

401 Authorization Required

+
nginx
+ + + """ + fake_http.get( + f'{DNSClient.API_URL}/zones', + status_code=401, + headers={"X-Auth-Token": self.openstack_token}, + text=response_unauthorized_html, + ) + with self.assertRaises(ApiException) as api_exception: + self.dns_client.list_zones() + self.assertEqual( + 'Authorization failed. Invalid or empty token.', + str(api_exception.exception), + ) + + @requests_mock.Mocker() + def test_request_bad_request_with_description(self, fake_http): + bad_response = dict(error="bad_request", description=("field required")) + fake_http.post( + f'{DNSClient.API_URL}/zones', + headers={"X-Auth-Token": self.openstack_token}, + status_code=422, + json=bad_response, + ) + with self.assertRaises(ApiException) as api_exception: + self.dns_client.create_zone(self.zone_name) + self.assertEqual( + f'Bad request. Description: {bad_response.get("description")}.', + str(api_exception.exception), + ) + + @requests_mock.Mocker() + def test_request_resource_not_found(self, fake_http): + bad_response_with_resource_not_found = dict( + error="zone_not_found", description="invalid value" + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset', + headers={"X-Auth-Token": self.openstack_token}, + status_code=404, + json=bad_response_with_resource_not_found, + ) + with self.assertRaises(ApiException) as api_exception: + self.dns_client.list_rrsets(self.zone_uuid) + self.assertEqual( + f'Resource not found: {bad_response_with_resource_not_found["error"]}.', + str(api_exception.exception), + ) + + @requests_mock.Mocker() + def test_request_resource_conflict(self, fake_http): + bad_response_with_resource_not_found = dict( + error="this_rrset_is_already_exists", description="invalid value" + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset', + headers={"X-Auth-Token": self.openstack_token}, + status_code=409, + json=bad_response_with_resource_not_found, + ) + with self.assertRaises(ApiException) as api_exception: + self.dns_client.list_rrsets(self.zone_uuid) + self.assertEqual( + f'Conflict: {bad_response_with_resource_not_found["error"]}.', + str(api_exception.exception), + ) + + @requests_mock.Mocker() + def test_request_internal_error(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones', + headers={"X-Auth-Token": self.openstack_token}, + status_code=500, + json={}, + ) + with self.assertRaises(ApiException) as api_exception: + self.dns_client.list_zones() + self.assertEqual('Internal server error.', str(api_exception.exception)) + + @requests_mock.Mocker() + def test_request_all_entities_without_offset(self, fake_http): + response_without_offset = self._response_list_rrset_without_offset + fake_http.get( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=response_without_offset, + ) + all_entities = self.dns_client._request_all_entities( + DNSClient._rrset_path(self.zone_uuid) + ) + self.assertEqual(response_without_offset["result"], all_entities) + + @requests_mock.Mocker() + def test_request_all_entities_with_offset(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset?limit={self._PAGINATION_LIMIT}', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=self._response_list_rrset_with_offset, + ) + fake_http.get( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/' + f'rrset?limit={self._PAGINATION_LIMIT}&offset=2', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=self._response_list_rrset_without_offset, + ) + all_entities = self.dns_client._request_all_entities( + DNSClient._rrset_path(self.zone_uuid) + ) + result_list = [] + result_list.extend(self._rrsets) + result_list.extend(self._rrsets) + self.assertEqual(result_list, all_entities) + + @requests_mock.Mocker() + def test_list_zone_success(self, fake_http): + response_without_offset = dict( + count=1, + next_offset=0, + result=[ + dict( + uuid="0eb2f07g-74fd-4271-a4b8-396e5fc95f60", + name=self.zone_name, + project_id=self.project_id, + created_at="2023-12-22T12:44:36Z", + updated_at="2023-12-22T13:34:14Z", + comment=None, + disabled=False, + delegation_checked_at="2023-12-22T13:34:14Z", + last_delegated_at=None, + last_check_status=False, + ) + ], + ) + fake_http.get( + f'{DNSClient.API_URL}/zones', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=response_without_offset, + ) + zones = self.dns_client.list_zones() + self.assertEqual(response_without_offset["result"], zones) + + @requests_mock.Mocker() + def test_create_zone_success(self, fake_http): + response_created_zone = dict( + uuid="bdd902e7-7270-44c8-8d18-120fa5e1e5d4", + name=self.zone_name, + project_id=self.project_id, + created_at="2023-12-22T15:07:31Z", + updated_at="2023-12-22T15:07:31Z", + comment=None, + disabled=False, + delegation_checked_at=None, + last_delegated_at=None, + last_check_status=False, + ) + fake_http.post( + f'{DNSClient.API_URL}/zones', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=response_created_zone, + ) + zone = self.dns_client.create_zone(self.zone_name) + self.assertEqual(response_created_zone, zone) + + @requests_mock.Mocker() + def test_list_rrsets_success(self, fake_http): + fake_http.get( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=self._response_list_rrset_without_offset, + ) + rrsets = self.dns_client.list_rrsets(self.zone_uuid) + self.assertEqual( + self._response_list_rrset_without_offset["result"], rrsets + ) + + @requests_mock.Mocker() + def test_create_rrset_success(self, fake_http): + response_created_rrset = dict( + uuid=self.rrset_uuid, + name=self.zone_name, + project_id=self.project_id, + created_at="2023-12-22T15:07:31Z", + updated_at="2023-12-22T15:07:31Z", + comment=None, + disabled=False, + delegation_checked_at=None, + last_delegated_at=None, + last_check_status=False, + ) + fake_http.post( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + json=response_created_rrset, + ) + rrsets = self.dns_client.create_rrset(self.zone_uuid, dict()) + self.assertEqual(response_created_rrset, rrsets) + + @requests_mock.Mocker() + def test_delete_rrset_success(self, fake_http): + fake_http.delete( + f'{DNSClient.API_URL}/zones/{self.zone_uuid}/rrset/{self.rrset_uuid}', + headers={"X-Auth-Token": self.openstack_token}, + status_code=200, + ) + response_from_delete = self.dns_client.delete_rrset( + self.zone_uuid, self.rrset_uuid + ) + self.assertEqual(dict(), response_from_delete) diff --git a/tests/v2/test_selectel_mappings.py b/tests/v2/test_selectel_mappings.py new file mode 100644 index 0000000..2ef6eea --- /dev/null +++ b/tests/v2/test_selectel_mappings.py @@ -0,0 +1,567 @@ +import collections +from unittest import TestCase + +from octodns.record import ( + AaaaRecord, + AliasRecord, + ARecord, + CnameRecord, + MxRecord, + SrvRecord, + SshfpRecord, + TxtRecord, +) +from octodns.zone import Zone + +from octodns_selectel.v2.exceptions import SelectelException +from octodns_selectel.v2.mappings import ( + to_octodns_record_data, + to_selectel_rrset, +) + +PairTest = collections.namedtuple("PairTest", ["record", "rrset"]) + + +class TestSelectelMappings(TestCase): + def setUp(self): + self.zone = Zone("test-octodns.ru.", []) + self.ttl = 3600 + + def _assert_mapping_common(self, test_pairs): + for tc in test_pairs: + with self.subTest(): + rrset_from_record = to_selectel_rrset(tc.record) + self.assertEqual( + rrset_from_record.get("name"), + tc.rrset["name"], + "Names must equals", + ) + self.assertEqual( + rrset_from_record.get("type"), + tc.rrset["type"], + "Types must equals", + ) + self.assertEqual( + rrset_from_record.get("ttl"), + tc.rrset["ttl"], + "TTLs must equals", + ) + + record_data_from_rrset = to_octodns_record_data(tc.rrset) + self.assertEqual( + record_data_from_rrset.get("type"), + tc.record._type, + "Types must equals", + ) + self.assertEqual( + record_data_from_rrset.get("ttl"), + tc.record.ttl, + "TTLs must equals", + ) + + def _assert_mapping_values(self, test_pairs): + for tc in test_pairs: + with self.subTest(): + rrset_from_record = to_selectel_rrset(tc.record) + self.assertListEqual( + rrset_from_record.get("records"), tc.rrset["records"] + ) + + record_data_from_rrset = to_octodns_record_data(tc.rrset) + self.assertListEqual( + record_data_from_rrset.get("values"), tc.record.values + ) + + def _assert_mapping_mx(self, test_pairs): + for tc in test_pairs: + with self.subTest(): + rrset_from_record = to_selectel_rrset(tc.record) + self.assertListEqual( + list( + map( + lambda value: value.get("content"), + rrset_from_record.get("records"), + ) + ), + list(map(lambda value: value.rdata_text, tc.record.values)), + ) + + record_data_from_rrset = to_octodns_record_data(tc.rrset) + self.assertListEqual( + list( + map( + lambda value: f"{value['preference']} {value['exchange']}", + record_data_from_rrset.get("values"), + ) + ), + list(map(lambda value: value.rdata_text, tc.record.values)), + ) + + def _srv_to_string(self, srv): + return ( + f"{srv['priority']} {srv['weight']} {srv['port']} {srv['target']}" + ) + + def _assert_mapping_srv(self, test_pairs): + for tc in test_pairs: + with self.subTest(): + rrset_from_record = to_selectel_rrset(tc.record) + self.assertListEqual( + list( + map( + lambda value: value.get("content"), + rrset_from_record.get("records"), + ) + ), + list(map(lambda value: value.rdata_text, tc.record.values)), + ) + + record_data_from_rrset = to_octodns_record_data(tc.rrset) + self.assertListEqual( + list( + map( + lambda srv_value: self._srv_to_string(srv_value), + record_data_from_rrset.get("values"), + ) + ), + list(map(lambda value: value.rdata_text, tc.record.values)), + ) + + def _sshfp_to_string(self, sshfp): + return f'{sshfp["algorithm"]} {sshfp["fingerprint_type"]} {sshfp["fingerprint"]}' + + def _assert_mapping_sshfp(self, test_pairs): + for tc in test_pairs: + with self.subTest(): + rrset_from_record = to_selectel_rrset(tc.record) + self.assertListEqual( + list( + map( + lambda value: value.get("content"), + rrset_from_record.get("records"), + ) + ), + list(map(lambda value: value.rdata_text, tc.record.values)), + ) + + record_data_from_rrset = to_octodns_record_data(tc.rrset) + self.assertListEqual( + sorted( + list( + map( + lambda sshfp_value: self._sshfp_to_string( + sshfp_value + ), + record_data_from_rrset.get("values"), + ) + ) + ), + list(map(lambda value: value.rdata_text, tc.record.values)), + ) + + def _assert_mapping_value(self, test_pairs): + for tc in test_pairs: + with self.subTest(): + rrset_from_record = to_selectel_rrset(tc.record) + self.assertListEqual( + rrset_from_record.get("records"), tc.rrset["records"] + ) + + record_data_from_rrset = to_octodns_record_data(tc.rrset) + self.assertEqual( + record_data_from_rrset.get("value"), tc.record.value + ) + + def test_mapping_record_a(self): + ipv4_list = ["10.20.30.40", "50.60.70.80"] + test_pairs = ( + PairTest( + ARecord(self.zone, "a", dict(ttl=self.ttl, value=ipv4_list[0])), + dict( + name=f"a.{self.zone.name}", + ttl=self.ttl, + type="A", + records=[dict(content=ipv4_list[0])], + ), + ), + PairTest( + ARecord(self.zone, "a", dict(ttl=self.ttl, values=ipv4_list)), + dict( + name=f"a.{self.zone.name}", + ttl=self.ttl, + type="A", + records=[ + dict(content=ipv4_list[0]), + dict(content=ipv4_list[1]), + ], + ), + ), + PairTest( + ARecord(self.zone, "", dict(ttl=self.ttl, values=ipv4_list)), + dict( + name=self.zone.name, + ttl=self.ttl, + type="A", + records=[ + dict(content=ipv4_list[0]), + dict(content=ipv4_list[1]), + ], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_values(test_pairs) + + def test_mapping_record_aaaa(self): + ipv6_list = [ + "4ad4:a6c4:f856:18be:5a5f:7f16:cc3a:fab9", + "da78:f69b:8e5a:6221:d0c9:64b8:c6c0:2eab", + ] + test_pairs = ( + PairTest( + AaaaRecord( + self.zone, + "aaaa", + dict(type="AAAA", ttl=self.ttl, value=ipv6_list[0]), + ), + dict( + name=f"aaaa.{self.zone.name}", + ttl=self.ttl, + type="AAAA", + records=[dict(content=ipv6_list[0])], + ), + ), + PairTest( + AaaaRecord( + self.zone, + "aaaa", + dict(type="AAAA", ttl=self.ttl, values=ipv6_list), + ), + dict( + name=f"aaaa.{self.zone.name}", + ttl=self.ttl, + type="AAAA", + records=[ + dict(content=ipv6_list[0]), + dict(content=ipv6_list[1]), + ], + ), + ), + PairTest( + AaaaRecord( + self.zone, + "", + dict(type="AAAA", ttl=self.ttl, values=ipv6_list), + ), + dict( + name=self.zone.name, + ttl=self.ttl, + type="AAAA", + records=[ + dict(content=ipv6_list[0]), + dict(content=ipv6_list[1]), + ], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_values(test_pairs) + + def test_mapping_record_txt(self): + txt_list = ["\"Buzz\"", "\"Fizz\""] + test_pairs = ( + PairTest( + TxtRecord( + self.zone, "txt", dict(ttl=self.ttl, value=txt_list[0]) + ), + dict( + name=f"txt.{self.zone.name}", + ttl=self.ttl, + type="TXT", + records=[dict(content=txt_list[0])], + ), + ), + PairTest( + TxtRecord( + self.zone, "txt", dict(ttl=self.ttl, values=txt_list) + ), + dict( + name=f"txt.{self.zone.name}", + ttl=self.ttl, + type="TXT", + records=[ + dict(content=txt_list[0]), + dict(content=txt_list[1]), + ], + ), + ), + PairTest( + TxtRecord(self.zone, "", dict(ttl=self.ttl, values=txt_list)), + dict( + name=self.zone.name, + ttl=self.ttl, + type="TXT", + records=[ + dict(content=txt_list[0]), + dict(content=txt_list[1]), + ], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_values(test_pairs) + + def test_mapping_record_mx(self): + mx_list_dict = [ + dict(preference=10, exchange="mail1.octodns-test.ru."), + dict(preference=20, exchange="mail2.octodns-test.ru."), + ] + mx_list_str = [ + f'{mx_record["preference"]} {mx_record["exchange"]}' + for mx_record in mx_list_dict + ] + test_pairs = ( + PairTest( + MxRecord( + self.zone, + "mx", + dict(type="MX", ttl=self.ttl, value=mx_list_dict[0]), + ), + dict( + name=f"mx.{self.zone.name}", + ttl=self.ttl, + type="MX", + records=[dict(content=mx_list_str[0])], + ), + ), + PairTest( + MxRecord( + self.zone, + "mx", + dict(type="MX", ttl=self.ttl, values=mx_list_dict), + ), + dict( + name=f"mx.{self.zone.name}", + ttl=self.ttl, + type="MX", + records=[ + dict(content=mx_list_str[0]), + dict(content=mx_list_str[1]), + ], + ), + ), + PairTest( + MxRecord( + self.zone, + "", + dict(type="MX", ttl=self.ttl, values=mx_list_dict), + ), + dict( + name=self.zone.name, + ttl=self.ttl, + type="MX", + records=[ + dict(content=mx_list_str[0]), + dict(content=mx_list_str[1]), + ], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_mx(test_pairs) + + def test_mapping_record_srv(self): + srv_list_dict = [ + dict( + priority=10, weight=60, port=5060, target="bigbox.example.com." + ), + dict( + priority=20, + weight=0, + port=5030, + target="backupbox.example.com.", + ), + ] + srv_list_str = [ + self._srv_to_string(srv_value) for srv_value in srv_list_dict + ] + test_pairs = ( + PairTest( + SrvRecord( + self.zone, + "_sip._tcp", + dict(type="SRV", ttl=self.ttl, value=srv_list_dict[0]), + ), + dict( + name=f"_sip._tcp.{self.zone.name}", + ttl=self.ttl, + type="SRV", + records=[dict(content=srv_list_str[0])], + ), + ), + PairTest( + SrvRecord( + self.zone, + "_sip._tcp", + dict(type="SRV", ttl=self.ttl, values=srv_list_dict), + ), + dict( + name=f"_sip._tcp.{self.zone.name}", + ttl=self.ttl, + type="SRV", + records=[ + dict(content=srv_list_str[0]), + dict(content=srv_list_str[1]), + ], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_srv(test_pairs) + + def test_mapping_record_sshfp(self): + sshfp_list_dict = [ + dict( + algorithm=4, + fingerprint_type=2, + fingerprint="123456789abcdef67890123456789abcdef67890123456789abcdef123456789", + ), + dict( + algorithm=1, + fingerprint_type=2, + fingerprint="4158F281921260B0205508121C6F5CEE879E15F22BDBC319EF2AE9FD308DB3BE", + ), + ] + sshfp_list_str = [ + self._sshfp_to_string(sshfp_value) + for sshfp_value in sshfp_list_dict + ] + test_pairs = ( + PairTest( + SshfpRecord( + self.zone, + "sshfp", + dict(type="SSHFP", ttl=self.ttl, value=sshfp_list_dict[0]), + ), + dict( + name=f"sshfp.{self.zone.name}", + ttl=self.ttl, + type="SSHFP", + records=[dict(content=sshfp_list_str[0])], + ), + ), + PairTest( + SshfpRecord( + self.zone, + "sshfp", + dict(type="SSHFP", ttl=self.ttl, values=sshfp_list_dict), + ), + dict( + name=f"sshfp.{self.zone.name}", + ttl=self.ttl, + type="SSHFP", + records=[ + dict(content=sshfp_list_str[0]), + dict(content=sshfp_list_str[1]), + ], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_sshfp(test_pairs) + + def test_mapping_record_cname(self): + cname_value = "proxydomain.ru." + test_pairs = ( + PairTest( + CnameRecord( + self.zone, + "cname", + dict(type="CNAME", ttl=self.ttl, value=cname_value), + ), + dict( + name=f"cname.{self.zone.name}", + ttl=self.ttl, + type="CNAME", + records=[dict(content=cname_value)], + ), + ), + PairTest( + CnameRecord( + self.zone, + "", + dict(type="CNAME", ttl=self.ttl, value=cname_value), + ), + dict( + name=self.zone.name, + ttl=self.ttl, + type="CNAME", + records=[dict(content=cname_value)], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_value(test_pairs) + + def test_mapping_record_alias(self): + cname_value = "proxydomain.ru." + test_pairs = ( + PairTest( + AliasRecord( + self.zone, + "alias", + dict(type="ALIAS", ttl=self.ttl, value=cname_value), + ), + dict( + name=f"alias.{self.zone.name}", + ttl=self.ttl, + type="ALIAS", + records=[dict(content=cname_value)], + ), + ), + PairTest( + AliasRecord( + self.zone, + "", + dict(type="ALIAS", ttl=self.ttl, value=cname_value), + ), + dict( + name=self.zone.name, + ttl=self.ttl, + type="ALIAS", + records=[dict(content=cname_value)], + ), + ), + ) + self._assert_mapping_common(test_pairs) + self._assert_mapping_value(test_pairs) + + def test_mapping_record_raise_exception_invalid_type(self): + invalid_type_record = ARecord( + self.zone, + "bad", + dict( + type="INCORRECT", + ttl=self.ttl, + values=["10.20.30.40", "50.60.70.80"], + ), + ) + invalid_type_record._type = "INCORRECT" + invalid_type_rrset = dict( + name=f"bad.{self.zone.name}", ttl=self.ttl, type="INCORRECT" + ) + + with self.assertRaises(SelectelException) as selectel_exception: + _ = to_octodns_record_data(invalid_type_rrset) + self.assertEquals( + selectel_exception.exception, + 'DNS Record with type: INCORRECT not supported', + ) + + with self.assertRaises(SelectelException) as selectel_exception: + _ = to_selectel_rrset(invalid_type_record) + print(invalid_type_record._type) + self.assertEquals( + selectel_exception.exception, + 'DNS Record with type: INCORRECT not supported', + )