Skip to content

Commit

Permalink
Merge pull request #7 from pogzyb/release/v0.1.3
Browse files Browse the repository at this point in the history
Release/v0.1.3
  • Loading branch information
pogzyb authored Jan 24, 2022
2 parents 6ec972e + 8c67f20 commit 8097422
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 104 deletions.
26 changes: 13 additions & 13 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@

class TestDNSClient(asynctest.TestCase):

def setUp(self) -> None:
async def setUp(self) -> None:
self.dns_client = DNSClient.new_client()

self.aio_dns_client = await DNSClient.new_aio_client()

def test_build_query_url(self):
expected_base_case = "http://some-url.com/domain/domain-name"
output = self.dns_client._build_query_uri("http://some-url.com/", "domain-name")
output = self.dns_client._build_query_href("http://some-url.com/", "domain-name")
assert output == expected_base_case, f"{output} != {expected_base_case}"
expected_absolute_case = "http://bad-example/domain/forward-slash-domain"
output = self.dns_client._build_query_uri("http://bad-example", "/forward-slash-domain")
output = self.dns_client._build_query_href("http://bad-example", "/forward-slash-domain")
assert output == expected_absolute_case, f"{output} != {expected_absolute_case}"

def test_check_status(self):
Expand All @@ -25,7 +25,7 @@ def test_check_status(self):
self.assertRaises(NotFoundError, self.dns_client._check_status_code, 404)
self.assertRaises(MalformedQueryError, self.dns_client._check_status_code, 400)

@mock.patch("whodap.client.DNSClient._get_request")
@mock.patch("whodap.client.RDAPClient._get_request")
@mock.patch("whodap.client.DomainResponse.from_json")
def test_lookup(self, mock_rdap_resp, mock_request):
self.dns_client.iana_dns_server_map = {'com': 'some-server-for-rdap'}
Expand All @@ -34,22 +34,22 @@ def test_lookup(self, mock_rdap_resp, mock_request):
self.dns_client.lookup('domain', 'com')
assert mock_request.call_count == 2, f'_aio_get_request call_count {mock_request.call_count} != 2'
assert mock_rdap_resp.call_count == 2, f'from_json call_count {mock_rdap_resp.call_count} != 2'
self.dns_client.lookup('domain', 'com', auth_ref='some-auth-ref')
assert mock_request.call_count == 3
assert mock_rdap_resp.call_count == 3
self.dns_client.lookup('domain', 'com', auth_href='some-auth-ref')
assert mock_request.call_count == 4, f'_aio_get_request call_count {mock_request.call_count} != 3'
assert mock_rdap_resp.call_count == 4, f'from_json call_count {mock_rdap_resp.call_count} != 3'

@mock.patch("whodap.client.DNSClient._aio_get_request")
@mock.patch("whodap.client.DomainResponse.from_json")
async def test_aio_lookup(self, mock_rdap_resp, mock_request):
self.dns_client.iana_dns_server_map = {'com': 'some-server-for-rdap'}
self.aio_dns_client.iana_dns_server_map = {'com': 'some-server-for-rdap'}
mock_request.return_value = mock.Mock(status_code=200)
mock_rdap_resp.return_value = mock.Mock(links=[mock.Mock(href='the-authority-server-for-domain')])
await self.dns_client.aio_lookup('domain', 'com')
await self.aio_dns_client.aio_lookup('domain', 'com')
assert mock_request.call_count == 2, f'_aio_get_request call_count {mock_request.call_count} != 2'
assert mock_rdap_resp.call_count == 2, f'from_json call_count {mock_rdap_resp.call_count} != 2'
await self.dns_client.aio_lookup('domain', 'com', auth_ref='some-auth-ref')
assert mock_request.call_count == 3
assert mock_rdap_resp.call_count == 3
await self.aio_dns_client.aio_lookup('domain', 'com', auth_href='some-auth-ref')
assert mock_request.call_count == 4
assert mock_rdap_resp.call_count == 4

def test_iana_server_map(self):
rdap_output = {
Expand Down
22 changes: 13 additions & 9 deletions whodap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@
from .response import DomainResponse

__all__ = ['aio_lookup_domain', 'lookup_domain', 'DNSClient']
__version__ = '0.1.2'
__version__ = '0.1.3'


def lookup_domain(domain: str,
tld: str,
httpx_client: Optional[Client] = None) -> DomainResponse:
def lookup_domain(
domain: str,
tld: str,
httpx_client: Optional[Client] = None
) -> DomainResponse:
"""
Convenience function that instantiates a DNSClient,
submits an RDAP query for the given domain, and returns
the result as a DomainResponse.
:param domain: the domain name to lookup
:param tld: the top level domain (e.g. "com", "net", "buzz")
:param httpx_client: Custom, pre-configured instance `httpx.Client`
:param httpx_client: Preconfigured instance `httpx.Client`
:return: an instance of DomainResponse
"""
dns_client = DNSClient.new_client(httpx_client)
Expand All @@ -29,17 +31,19 @@ def lookup_domain(domain: str,
return response


async def aio_lookup_domain(domain: str,
tld: str,
httpx_client: Optional[AsyncClient] = None) -> DomainResponse:
async def aio_lookup_domain(
domain: str,
tld: str,
httpx_client: Optional[AsyncClient] = None
) -> DomainResponse:
"""
Async-compatible convenience function that instantiates
a DNSClient, submits an RDAP query for the given domain,
and returns the result as a DomainResponse.
:param domain: the domain name to lookup
:param tld: the top level domain (e.g. "com", "net", "buzz")
:param httpx_client: Custom, pre-configured instance `httpx.AsyncClient`
:param httpx_client: Preconfigured instance `httpx.AsyncClient`
:return: an instance of DomainResponse
"""
dns_client = await DNSClient.new_aio_client(httpx_client)
Expand Down
166 changes: 84 additions & 82 deletions whodap/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager

# different installs for async contextmanager based on python version
if sys.version_info.major == 3 and sys.version_info.minor < 7:
if sys.version_info < (3, 7):
from async_generator import asynccontextmanager
else:
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -59,7 +59,7 @@ async def aio_lookup(self):

@staticmethod
@abstractmethod
def _build_query_uri() -> str:
def _build_query_href() -> str:
...

def _get_request(self, uri: str) -> httpx.Response:
Expand Down Expand Up @@ -92,6 +92,7 @@ class DNSClient(RDAPClient):
def __init__(self, httpx_client: Union[httpx.Client, httpx.AsyncClient]):
super(DNSClient, self).__init__(httpx_client)
self.iana_dns_server_map: Dict[str, str] = {}
self.rdap_hrefs = set()

@classmethod
@contextmanager
Expand Down Expand Up @@ -172,89 +173,90 @@ async def aio_get_iana_dns_info(self):
return response.json()

@staticmethod
def _build_query_uri(rdap_uri: str, domain: str) -> str:
return posixpath.join(rdap_uri, 'domain', domain.lstrip('/'))

def lookup(self, domain: str, tld: str, auth_ref: str = None) -> DomainResponse:
def _build_query_href(rdap_href: str, domain: str) -> str:
return posixpath.join(rdap_href, 'domain', domain.lstrip('/'))

async def aio_lookup(
self,
domain: str,
tld: str,
auth_href: str = None
) -> DomainResponse:
"""
Performs an RDAP domain lookup.
First, finds the appropriate server for the top level domain,
sends an HTTP request to the server, parses the response for a more authoritative source,
sends an additional HTTP request to the more authoritative source, and finally
encapsulates the HTTP response into a DomainResponse object.
:param domain: the domain name
:param tld: the top level domain
:param auth_ref: optional authoritative url for the given TLD
:return: instance of DomainResponse
Performs an asynchronous RDAP domain lookup.
Finds the authoritative server for the domain and encapsulates
the RDAP response into a DomainResponse object.
:param domain: The domain name
:param tld: The top level domain
:param auth_href: Optional authoritative URL for the given TLD
:return: Instance of DomainResponse
"""
domain_and_tld = domain + '.' + tld
# if an authoritative url is provided; use it
if auth_ref:
query_url = self._build_query_uri(auth_ref, domain_and_tld)
resp = self._get_request(query_url)
self._check_status_code(resp.status_code)
return DomainResponse.from_json(resp.text)
# start with looking up server in the IANA list
server_url = self.iana_dns_server_map.get(tld)
if not server_url:
raise NotImplementedError(f'No RDAP Server for ".{tld.upper()}"')
# hit the server found in the IANA list
query_url = self._build_query_uri(server_url, domain_and_tld)
response = self._get_request(query_url)
self._check_status_code(response.status_code)
domain_response = DomainResponse.from_json(response.text)
# try to extract an authoritative server for this domain
if hasattr(domain_response, 'links'):
authoritative_url = domain_response.links[-1].href
# avoid redundant connections
if authoritative_url.lower() != query_url.lower():
resp = self._get_request(authoritative_url)
self._check_status_code(resp.status_code)
return DomainResponse.from_json(resp.text)
else:
return domain_response
else:
return domain_response

async def aio_lookup(self, domain: str, tld: str, auth_ref: str = None) -> DomainResponse:
# set starting href
base_href = auth_href or self.iana_dns_server_map.get(tld)
if not base_href:
raise NotImplementedError(f'No RDAP server found for .{tld.upper()} domains')
# build query href
domain_name = domain + '.' + tld
href = self._build_query_href(base_href, domain_name)
domain_response = await self._aio_get_authoritative_response(href)
# return response
return domain_response

def lookup(
self,
domain: str,
tld: str,
auth_href: str = None
) -> DomainResponse:
"""
Performs an RDAP domain lookup.
Finds the authoritative server for the domain and encapsulates
the RDAP response into a DomainResponse object.
First, finds the appropriate server for the top level domain,
sends an HTTP request to the server, parses the response for a more authoritative source,
sends an additional HTTP request to the more authoritative source, and finally
encapsulates the HTTP response into a DomainResponse object.
:param domain: the domain name
:param tld: the top level domain
:param auth_ref: optional authoritative url for the given TLD
:return: instance of DomainResponse
:param domain: The domain name
:param tld: The top level domain
:param auth_href: Optional authoritative URL for the given TLD
:return: Instance of DomainResponse
"""
domain_and_tld = domain + '.' + tld
if auth_ref:
query_url = self._build_query_uri(auth_ref, domain_and_tld)
resp = await self._aio_get_request(query_url)
self._check_status_code(resp.status_code)
return DomainResponse.from_json(resp.read())
server_url = self.iana_dns_server_map.get(tld)
if not server_url:
raise NotImplementedError(f'Could not find RDAP server for .{tld.upper()} domains')
query_url = self._build_query_uri(server_url, domain_and_tld)
response = await self._aio_get_request(query_url)
self._check_status_code(response.status_code)
domain_response = DomainResponse.from_json(response.read())
# set starting href
base_href = auth_href or self.iana_dns_server_map.get(tld)
if not base_href:
raise NotImplementedError(f'No RDAP server found for .{tld.upper()} domains')
# build query href
domain_name = domain + '.' + tld
href = self._build_query_href(base_href, domain_name)
domain_response = self._get_authoritative_response(href)
# return response
return domain_response

def _get_authoritative_response(self, href: str) -> DomainResponse:
resp = self._get_request(href)
self._check_status_code(resp.status_code)
domain_response = DomainResponse.from_json(resp.read())
# save href chain
self.rdap_hrefs.add(href)
# check for more authoritative source
if hasattr(domain_response, 'links'):
authoritative_url = domain_response.links[-1].href
if authoritative_url.lower() != query_url.lower():
resp = await self._aio_get_request(authoritative_url)
self._check_status_code(resp.status_code)
return DomainResponse.from_json(resp.read())
else:
return domain_response
else:
return domain_response
next_href = domain_response.links[-1].href.lower()
if next_href and next_href != href:
domain_response = self._get_authoritative_response(next_href)
# return response
return domain_response

async def _aio_get_authoritative_response(self, href: str) -> DomainResponse:
resp = await self._aio_get_request(href)
self._check_status_code(resp.status_code)
domain_response = DomainResponse.from_json(resp.read())
# save href chain
self.rdap_hrefs.add(href)
# check for more authoritative source
if hasattr(domain_response, 'links'):
next_href = domain_response.links[-1].href.lower()
if next_href and next_href != href:
domain_response = await self._aio_get_authoritative_response(next_href)
# return response
return domain_response

def set_iana_dns_info(self, iana_dns_map: Dict[str, Any]) -> None:
"""
Expand Down Expand Up @@ -297,8 +299,8 @@ async def aio_lookup(self):
...

@staticmethod
def _build_query_uri(rdap_uri: str, ip_address: str) -> str:
return posixpath.join(rdap_uri, ip_address)
def _build_query_href(rdap_href: str, ip_address: str) -> str:
return posixpath.join(rdap_href, ip_address)

def _set_ipv4_server_map(self, iana_ipv4_map: Dict[str, Any]):
...
Expand All @@ -323,7 +325,7 @@ async def aio_lookup(self):
...

@staticmethod
def _build_query_uri() -> str:
def _build_query_href() -> str:
...


Expand All @@ -346,5 +348,5 @@ async def _aio_load_from_iana(self):
...

@staticmethod
def _build_query_uri() -> str:
def _build_query_href() -> str:
...

0 comments on commit 8097422

Please sign in to comment.