Skip to content

Commit

Permalink
Add new functions to lookup AS data
Browse files Browse the repository at this point in the history
  • Loading branch information
lpellegr committed Jun 13, 2024
1 parent 0cd403a commit 0092942
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- Require Python 3.8+.
- Add new functions for retrieving Autonomous System data: `batch_lookup_asns`, `lookup_asn`, `origin_lookup_asn`.
- Add new functions for user-agent header value parsing: `batch_parse_user_agents`, `parse_user_agent`.
- API key is passed as header value and no longer as query parameter.
- Client library method are now wrapped in a new _ApiResponse_ object that includes a mean to retrieve metadata
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ credits_consumed = response.credits.consumed
credits_remaining = response.credits.remaining
```

#### Single ASN Lookup

```python
from ipregistry import IpregistryClient

client = IpregistryClient("YOUR_API_KEY")
response = client.lookup_asn(42)
print(response.credits.consumed)
print(response.data.prefixes)
print(response.data.relationships)
```

#### Batch IP Lookup

```python
Expand Down
37 changes: 32 additions & 5 deletions ipregistry/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
limitations under the License.
"""
from .cache import IpregistryCache, NoCache
from .json import IpInfo, RequesterIpInfo
from .json import AutonomousSystem, IpInfo
from .model import LookupError, ApiResponse, ApiResponseCredits, ApiResponseThrottling
from .request import DefaultRequestHandler, IpregistryRequestHandler

Expand All @@ -30,6 +30,9 @@ def __init__(self, key_or_config, **kwargs):
if not isinstance(self._requestHandler, IpregistryRequestHandler):
raise ValueError("Given request handler instance is not of type IpregistryRequestHandler")

def batch_lookup_asns(self, ips, **options):
return self.batch_request(ips, self._requestHandler.batch_lookup_asns, **options)

def batch_lookup_ips(self, ips, **options):
return self.batch_request(ips, self._requestHandler.batch_lookup_ips, **options)

Expand Down Expand Up @@ -77,25 +80,40 @@ def batch_request(self, items, request_handler_func, **options):

return response

def lookup_ip(self, ip='', **options):
def lookup_asn(self, asn, **options):
return self.__lookup_asn(asn, options)

def lookup_ip(self, ip, **options):
if isinstance(ip, str):
return self.__lookup_ip(ip, options)
else:
raise ValueError("Invalid value for 'ip' parameter: " + ip)

def origin_lookup_asn(self, **options):
return self.__lookup_asn('AS', options)

def origin_lookup_ip(self, **options):
return self.__lookup_ip('', options)

def origin_parse_user_agent(self, **options):
return self._requestHandler.origin_parse_user_agent(options)

def __lookup_asn(self, asn, options):
return self.__lookup(
'AS' + str(asn) if IpregistryClient.__is_number(asn) else 'AS',
options, self._requestHandler.lookup_asn,
AutonomousSystem)

def __lookup_ip(self, ip, options):
cache_key = self.__build_cache_key(ip, options)
return self.__lookup(ip, options, self._requestHandler.lookup_ip, IpInfo)

def __lookup(self, key, options, lookup_func, response_type):
cache_key = self.__build_cache_key(key, options)
cache_value = self._cache.get(cache_key)

if cache_value is None:
response = self._requestHandler.lookup_ip(ip, options)
if isinstance(response.data, IpInfo):
response = lookup_func(key, options)
if isinstance(response.data, response_type):
self._cache.put(cache_key, response.data)
return response

Expand Down Expand Up @@ -125,6 +143,15 @@ def __build_cache_key(key, options):
def __is_api_error(data):
return 'code' in data

@staticmethod
def __is_number(value):
try:
# Try converting the value to a float
float(value)
return True
except ValueError:
return False


class IpregistryConfig:
def __init__(self, key, base_url="https://api.ipregistry.co", timeout=15):
Expand Down
57 changes: 55 additions & 2 deletions ipregistry/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
import requests

from .__init__ import __version__
from .model import (ApiError, ApiResponse, ApiResponseCredits, ApiResponseThrottling, ClientError, IpInfo,
LookupError, RequesterIpInfo, RequesterUserAgent, UserAgent)
from .model import (ApiError, ApiResponse, ApiResponseCredits, ApiResponseThrottling, AutonomousSystem,
ClientError, IpInfo, LookupError, RequesterAutonomousSystem, RequesterIpInfo,
RequesterUserAgent, UserAgent)


class IpregistryRequestHandler(ABC):
def __init__(self, config):
self._config = config

@abstractmethod
def batch_lookup_asns(self, ips, options):
pass

@abstractmethod
def batch_lookup_ips(self, ips, options):
pass
Expand All @@ -39,6 +44,10 @@ def batch_lookup_ips(self, ips, options):
def batch_parse_user_agents(self, user_agents, options):
pass

@abstractmethod
def lookup_asn(self, asn, options):
pass

@abstractmethod
def lookup_ip(self, ip, options):
pass
Expand All @@ -65,6 +74,29 @@ def _build_base_url(self, resource, options):


class DefaultRequestHandler(IpregistryRequestHandler):
def batch_lookup_asns(self, asns, options):
response = None
try:
response = requests.post(
self._build_base_url('', options),
data=json.dumps(list(map(lambda asn: "AS" + str(asn), asns))),
headers=self.__headers(),
timeout=self._config.timeout
)
response.raise_for_status()
results = response.json().get('results', [])

parsed_results = [
LookupError(data) if 'code' in data else AutonomousSystem(**data)
for data in results
]

return self.build_api_response(response, parsed_results)
except requests.HTTPError:
self.__create_api_error(response)
except Exception as e:
raise ClientError(e)

def batch_lookup_ips(self, ips, options):
response = None
try:
Expand Down Expand Up @@ -111,6 +143,27 @@ def batch_parse_user_agents(self, user_agents, options):
except Exception as e:
raise ClientError(e)

def lookup_asn(self, asn, options):
response = None
try:
response = requests.get(
self._build_base_url(asn, options),
headers=self.__headers(),
timeout=self._config.timeout
)
response.raise_for_status()
json_response = response.json()

return self.build_api_response(
response,
RequesterAutonomousSystem(**json_response) if asn == 'AS'
else AutonomousSystem(**json_response)
)
except requests.HTTPError:
self.__create_api_error(response)
except Exception as err:
raise ClientError(err)

def lookup_ip(self, ip, options):
response = None
try:
Expand Down
35 changes: 35 additions & 0 deletions samples/batch-lookup-asns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Copyright 2019 Ipregistry (https://ipregistry.co).
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from ipregistry import ApiError, AutonomousSystem, ClientError, IpregistryClient

try:
api_key = "tryout"
client = IpregistryClient(api_key)
response = client.batch_lookup_asns([42, 4441, 51933])
data_list = response.data

for entry in data_list:
if isinstance(entry, AutonomousSystem):
print("AutonomousSystem", entry)
else:
print("LookupError", entry)
except ApiError as e:
print("API error", e)
except ClientError as e:
print("Client error", e)
except Exception as e:
print("Unexpected error", e)
10 changes: 5 additions & 5 deletions samples/batch-lookup-ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
api_key = "tryout"
client = IpregistryClient(api_key)
response = client.batch_lookup_ips(["73.2.2.2", "8.8.8.8", "2001:67c:2e8:22::c100:68b"])
ip_info_list = response.data
data_list = response.data

for lookup_result in ip_info_list:
if isinstance(lookup_result, IpInfo):
print("IpInfo", lookup_result)
for entry in data_list:
if isinstance(entry, IpInfo):
print("IpInfo", entry)
else:
print("LookupError", lookup_result)
print("LookupError", entry)
except ApiError as e:
print("API error", e)
except ClientError as e:
Expand Down
30 changes: 30 additions & 0 deletions samples/lookup-asn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Copyright 2019 Ipregistry (https://ipregistry.co).
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from ipregistry import ApiError, ClientError, IpregistryClient

try:
api_key = "tryout"
client = IpregistryClient(api_key)
response = client.lookup_asn(42)
print(response.credits.consumed)
print(response.data.relationships)
except ApiError as e:
print("API error", e)
except ClientError as e:
print("Client error", e)
except Exception as e:
print("Unexpected error", e)
30 changes: 30 additions & 0 deletions samples/origin-lookup-asn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Copyright 2019 Ipregistry (https://ipregistry.co).
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from ipregistry import ApiError, ClientError, IpregistryClient

try:
api_key = "tryout"
client = IpregistryClient(api_key)
response = client.origin_lookup_asn()
as_data = response.data
print(as_data)
except ApiError as e:
print("API error", e)
except ClientError as e:
print("Client error", e)
except Exception as e:
print("Unexpected error", e)
39 changes: 36 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,27 @@
import os
import unittest

from ipregistry import ApiError, IpInfo, LookupError, ClientError, UserAgent
from ipregistry import ApiError, AutonomousSystem, IpInfo, LookupError, ClientError, UserAgent
from ipregistry.cache import InMemoryCache, NoCache
from ipregistry.core import IpregistryClient, IpregistryConfig


class TestIpregistryClient(unittest.TestCase):

def test_batch_lookup_asns(self):
"""
Test batch asns lookup with valid and invalid inputs
"""
client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY'))
response = client.batch_lookup_asns([33, 'invalid', -1])
print(response)
self.assertEqual(3, len(response.data))
self.assertEqual(True, isinstance(response.data[0], AutonomousSystem))
self.assertEqual(True, isinstance(response.data[1], LookupError))
self.assertEqual('INVALID_ASN', response.data[1].code)
self.assertEqual(True, isinstance(response.data[2], LookupError))
self.assertEqual('INVALID_ASN', response.data[2].code)

def test_batch_lookup_ips(self):
"""
Test batch ips lookup with valid and invalid inputs
Expand Down Expand Up @@ -83,6 +97,16 @@ def test_client_cache_inmemory_batch_ips_lookup(self):
batch_ips_response2 = client.batch_lookup_ips(['1.1.1.1', '1.1.1.3'])
self.assertEqual(0, batch_ips_response2.credits.consumed)

def test_lookup_asn(self):
"""
Test Autonomous System data lookup by ASN
"""
client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY'))
response = client.lookup_asn(400923)
self.assertEqual(400923, response.data.asn)
self.assertEqual(1, response.credits.consumed)
self.assertIsNotNone(response.data.relationships)

def test_lookup_ip(self):
"""
Test that a simple IP lookup returns data
Expand All @@ -100,7 +124,6 @@ def test_lookup_ip_invalid_input(self):
client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY'))
with self.assertRaises(ApiError) as context:
response = client.lookup_ip('invalid')
print("test", context.exception)
self.assertEqual('INVALID_IP_ADDRESS', context.exception.code)

def test_lookup_ip_cache(self):
Expand All @@ -122,12 +145,22 @@ def test_lookup_timeout(self):
with self.assertRaises(ClientError):
client.lookup_ip('1.1.1.1')

def test_origin_asn(self):
"""
Test origin Autonomous System data lookup
"""
client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY'))
response = client.origin_lookup_asn()
self.assertIsNotNone(response.data.asn)
self.assertEqual(1, response.credits.consumed)
self.assertIsNotNone(response.data.relationships)

def test_origin_lookup_ip(self):
"""
Test that a simple origin IP lookup returns data
"""
client = IpregistryClient(os.getenv('IPREGISTRY_API_KEY'))
response = client.lookup_ip()
response = client.origin_lookup_ip()
self.assertIsNotNone(response.data.ip)
self.assertIsNotNone(response.data.user_agent)

Expand Down

0 comments on commit 0092942

Please sign in to comment.