diff --git a/docs/api/asm/index.rst b/docs/api/asm/index.rst new file mode 100644 index 000000000..d6296c0a9 --- /dev/null +++ b/docs/api/asm/index.rst @@ -0,0 +1 @@ +.. automodule:: tenable.asm.session diff --git a/docs/api/asm/inventory.rst b/docs/api/asm/inventory.rst new file mode 100644 index 000000000..feee4f9f8 --- /dev/null +++ b/docs/api/asm/inventory.rst @@ -0,0 +1 @@ +.. automodule:: tenable.asm.inventory diff --git a/docs/api/asm/smart_folders.rst b/docs/api/asm/smart_folders.rst new file mode 100644 index 000000000..84fd9f76d --- /dev/null +++ b/docs/api/asm/smart_folders.rst @@ -0,0 +1 @@ +.. automodule:: tenable.asm.smart_folders diff --git a/docs/index.rst b/docs/index.rst index 6e7888e0b..1cab9f2f2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,6 +9,7 @@ api/dl/index api/ie/index api/apa/index + api/asm/index api/nessus/index api/reports/index api/base/index diff --git a/tenable/asm/__init__.py b/tenable/asm/__init__.py new file mode 100644 index 000000000..48971c760 --- /dev/null +++ b/tenable/asm/__init__.py @@ -0,0 +1 @@ +from .session import TenableASM diff --git a/tenable/asm/inventory.py b/tenable/asm/inventory.py new file mode 100644 index 000000000..8fdb4bee9 --- /dev/null +++ b/tenable/asm/inventory.py @@ -0,0 +1,112 @@ +""" +Inventory +========= + +Methods described in this section relate to the inventory API and can be accessed at +``TenableASM.inventory``. + +.. rst-class:: hide-signature +.. autoclass:: InventoryAPI + :members: + +.. autoclass:: InventoryIterator + :members: +""" +from typing import Dict, List, Any, Optional, Tuple, TYPE_CHECKING +from copy import copy +from tenable.base.endpoint import APIEndpoint +from restfly.iterator import APIIterator + +if TYPE_CHECKING: + from .session import TenableASM + from box import BoxList + + +class InventoryIterator(APIIterator): + """ + Asset inventory iterator + """ + _after_asset_id: str = '0000000000' + _filters: List[Dict[str, str]] + _query: Dict[str, Any] + _api: 'TenableASM' + page: 'BoxList' + limit: int = 1000 + total: int + stats: Dict[str, Any] + + def _get_page(self): + query = copy(self._query) + if not query.get('after'): + query['after'] = self._after_asset_id + + query['limit'] = self.limit + resp = self._api.post('inventory', params=query, json=self._filters) + self.page = resp.assets + self.total = resp.total + self.stats = resp.stats + + if self.page: + self._after_asset_id = self.page[-1].id + + + +class InventoryAPI(APIEndpoint): + def list(self, + *search: Tuple[str, str, str], + columns: Optional[List[str]] = None, + size: int = 1000, + sort_field: Optional[str] = None, + sort_asc: bool = True, + inventory: bool = False, + ) -> InventoryIterator: + """ + Lists the assets in the inventory + + Args: + *search (tuple[str, str, str], optional): + A 3-part search tuple detailing what to search for from the ASM + dataset. For example: + ``('bd.original_hostname', 'ends with', '.com')`` + columns (list[str], optional): + The list of columns to return in the response. + size (int, optional): + The number of records to return with each page from the API. Must be + an integer between `1` and `10000`. + sort_field (str, optional): + What field should the results be worted by? + sort_asc (bool): + How should the results be sorted? ``True`` specifies ascending sort, + whereas ``False`` refers to descending. + + Example: + >>> for item in asm.inventory.list(): + ... print(item) + """ + if not columns: + columns = [ + 'bd.original_hostname', + 'bd.severity_ranking', + 'bd.hostname', + 'bd.record_type', + 'bd.ip_address', + 'id', + 'bd.addedtoportfolio', + 'bd.smartfolders', + 'bd.app_updates', + 'ports.ports', + 'screenshot.redirect_chain', + 'screenshot.finalurl', + 'ports.cves', + ] + return InventoryIterator( + self._api, + _query={ + 'columns': ','.join(columns), + 'inventory': str(bool(inventory)).lower(), + 'sortorder': str(bool(sort_asc)).lower(), + 'sortby': sort_field, + }, + _filters = [{'column': c, 'type': t, 'value': v} for c, t, v in search], + limit=size + ) diff --git a/tenable/asm/session.py b/tenable/asm/session.py new file mode 100644 index 000000000..dcedd8626 --- /dev/null +++ b/tenable/asm/session.py @@ -0,0 +1,112 @@ +""" +Tenable Attack Surface Management +================================= + +This package covers the Tenable ASM application. + +.. autoclass:: TenableASM + :members: + + +.. toctree:: + :hidden: + :glob: + + inventory + smart_folders + +""" +from tenable.base.platform import APIPlatform +from .inventory import InventoryAPI +from .smart_folders import SmartFoldersAPI + + +class TenableASM(APIPlatform): + """ + The TenableASM class is the primary interaction point for users to interface with + Tenable Attack Surface Management via the pyTenable library. All the API endpoint + classes that wrap the various aspects of ASM will be attached to this base class. + + Args: + api_key (str, optional): + The user's API key to interface into Tenable ASM. If the key isn't + specified, then the library will attempt to read the environment + variable ``TASM_API_KEY`` to get the key. + url (str, optional): + The base URL that the paths will be appended onto. If the url isn't + specified, then the library will attempt to read the environment variable + ``TASM_URL``. + retries (int, optional): + The number of retries to make before failing a request. The + default is ``5``. + backoff (float, optional): + If a 429 response is returned, how much do we want to backoff + if the response didn't send a Retry-After header. The default + backoff is ``1`` second. + vendor (str, optional): + The vendor name for the User-Agent string. + product (str, optional): + The product name for the User-Agent string. + build (str, optional): + The version or build identifier for the User-Agent string. + timeout (int, optional): + The connection timeout parameter informing the library how long to + wait in seconds for a stalled response before terminating the + connection. If unspecified, the default is 120 seconds. + + Examples: + + Basic example: + + >>> from tenable.asm import TenableASM + >>> tasm = TenableASM(url='https://asm.cloud.tenable.com', + ... api_key='abcdef1234567890' + ... ) + + Another example with proper identification: + + >>> tasm = TenableASM(url='https://asm.cloud.tenable.com', + ... api_key='abcdef1234567890', + ... vendor='Company Name', + ... product='My Awesome Widget', + ... build='1.0.0' + ... ) + + Yet another example thats leveraging the ``TASM_API_KEY`` and + ``TASM_URL`` environment variables: + + >>> os.environ['TASM_URL'] = 'https://asm.cloud.tenable.com' + >>> os.environ['TASM_API_KEY'] = 'abcdef1234567890' + >>> tasm = TenableASM(vendor='Company Name', + ... product='My Awesome Widget', + ... build='1.0.0' + ... ) + """ + _base_path = 'api/1.0' + _env_base = 'TASM' + _box = True + _allowed_auth_mech_priority = ['key'] + _allowed_auth_mech_params = {'key': ['api_key']} + + def _key_auth(self, api_key, **kwargs): + """ + API Key authorization mechanism for Tenable ASM. + """ + self._session.headers.update({'Authorization': api_key}) + self._auth_meth = 'key' + + @property + def inventory(self): + """ + The interface object for the + :doc:`Tenable ASM Inventory API ` + """ + return InventoryAPI(self) + + @property + def smart_folders(self): + """ + The interface object for the + :doc:`Tenable ASM Smart Folders API ` + """ + return SmartFoldersAPI(self) diff --git a/tenable/asm/smart_folders.py b/tenable/asm/smart_folders.py new file mode 100644 index 000000000..d99dea547 --- /dev/null +++ b/tenable/asm/smart_folders.py @@ -0,0 +1,26 @@ +""" +Smart Folders +============= + +Methods described in this section relate to the smart folders API and can be accessed at +``TenableASM.smart_folders``. + +.. rst-class:: hide-signature +.. autoclass:: SmartFoldersAPI + :members: +""" +from typing import Dict, List, Any +from tenable.base.endpoint import APIEndpoint + + +class SmartFoldersAPI(APIEndpoint): + _path = 'smartfolders' + + def list(self) -> List[Dict[str, Any]]: + """ + Returns the list of smart folders from ASM. + + Example: + >>> folders = asm.smartfolders.list() + """ + return self._get() diff --git a/tenable/base/platform.py b/tenable/base/platform.py index b2e792d54..69de6af5c 100644 --- a/tenable/base/platform.py +++ b/tenable/base/platform.py @@ -85,15 +85,15 @@ class APIPlatform(Base): def __init__(self, **kwargs): - # if the constructed URL isn't valid, then we will throw a TypeError + # if the constructed URL isn't valid, then we will throw a ConnectionError # to inform the caller that something isn't right here. - self._url = kwargs.get('url', - os.environ.get(f'{self._env_base}_URL', - self._url - ) - ) - if not url_validator(self._url): - raise TypeError(f'{self._url} is not a valid URL') + url = kwargs.get('url') + if not url: + url = os.environ.get(f'{self._env_base}_URL', self._url) + self._url = url + + if not (self._url and url_validator(self._url, validate=['scheme', 'netloc'])): + raise ConnectionError(f'{self._url} is not a valid URL') # CamelCase squashing is an optional parameter thanks to Box. if the # user has requested it, then we should add the appropriate parameter diff --git a/tests/apa/__init__.py b/tests/apa/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/asm/__init__.py b/tests/asm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/asm/test_inventory.py b/tests/asm/test_inventory.py new file mode 100644 index 000000000..e813b3d29 --- /dev/null +++ b/tests/asm/test_inventory.py @@ -0,0 +1,70 @@ +import pytest +import responses +from responses.registries import OrderedRegistry +from responses.matchers import json_params_matcher, query_param_matcher +from tenable.asm import TenableASM + + +@responses.activate(registry=OrderedRegistry) +def test_asm_inventory_list(): + test_item = {'id': 123456} + responses.post( + 'https://nourl/api/1.0/inventory', + json={'assets': [test_item for _ in range(1000)], 'total': 2005, 'stats': {}}, + match=[ + query_param_matcher({ + 'columns': 'id,name', + 'inventory': 'false', + 'sortorder': 'true', + 'sortby': 'id', + 'after': '0000000000', + 'limit': 1000, + }), + json_params_matcher([ + {'column': 'id', 'type': 'equals', 'value': 'something'} + ]) + ] + ) + responses.post( + 'https://nourl/api/1.0/inventory', + json={'assets': [test_item for _ in range(1000)], 'total': 2005, 'stats': {}}, + match=[ + query_param_matcher({ + 'columns': 'id,name', + 'inventory': 'false', + 'sortorder': 'true', + 'sortby': 'id', + 'after': '123456', + 'limit': 1000, + }), + json_params_matcher([ + {'column': 'id', 'type': 'equals', 'value': 'something'} + ]) + ] + ) + responses.post( + 'https://nourl/api/1.0/inventory', + json={'assets': [test_item for _ in range(5)], 'total': 2005, 'stats': {}}, + match=[ + query_param_matcher({ + 'columns': 'id,name', + 'inventory': 'false', + 'sortorder': 'true', + 'sortby': 'id', + 'after': '123456', + 'limit': 1000, + }), + json_params_matcher([ + {'column': 'id', 'type': 'equals', 'value': 'something'} + ]) + ] + ) + asm = TenableASM(url='https://nourl', api_key='12345') + items = asm.inventory.list( + ('id', 'equals', 'something'), + columns=['id', 'name'], + sort_field='id' + ) + for item in items: + assert dict(item) == test_item + assert items.count == 2005 diff --git a/tests/asm/test_session.py b/tests/asm/test_session.py new file mode 100644 index 000000000..a344f0604 --- /dev/null +++ b/tests/asm/test_session.py @@ -0,0 +1,17 @@ +import os +import pytest +from tenable.asm import TenableASM +from tenable.errors import AuthenticationWarning + + +def test_asm_session_authentication(): + asm = TenableASM(url='http://nourl', api_key='abcdef') + assert asm._session.headers['Authorization'] == 'abcdef' + + os.environ['TASM_API_KEY'] = 'efghi' + asm = TenableASM(url='http://nourl') + assert asm._session.headers['Authorization'] == 'efghi' + + os.environ.pop('TASM_API_KEY') + with pytest.warns(AuthenticationWarning): + asm = TenableASM(url='http://nourl') diff --git a/tests/asm/test_smart_folders.py b/tests/asm/test_smart_folders.py new file mode 100644 index 000000000..cd06f6899 --- /dev/null +++ b/tests/asm/test_smart_folders.py @@ -0,0 +1,49 @@ +import pytest +import responses +from tenable.asm import TenableASM + + +@responses.activate +def test_asm_smartfolder_list(): + folder = { + 'id': 1264, + 'name': 'Recently Added Assets', + 'hash': None, + 'description': 'SOMETHING', + 'filters': [ + {'column': 'bd.addedtoportfolio', 'type': 'less than', 'value': '7'} + ], + 'notifications': [ + { + 'type': 'email', + 'enabled': True, + 'email': 'user@company' + } + ], + 'current_asset_count': 0, + 'updated_at': '2024-09-11T00:47:20.000Z', + 'stats': { + 'total': 0, + 'domaincount': 0, + 'subdomaincount': 0 + }, + 'extra': { + 'columns': [ + 'bd.record_type', + 'bd.hostname', + 'bd.ip_address', + 'bd.addedtoportfolio' + ], + 'view': 'table' + }, + 'template_id': None, + 'shared_at': None, + 'expiration_days': 0 + } + responses.get( + 'https://nourl/api/1.0/smartfolders', + json=[folder for _ in range(100)] + ) + asm = TenableASM(url='https://nourl', api_key='abcdef') + for item in asm.smart_folders.list(): + assert dict(item) == folder diff --git a/tests/base/__init__.py b/tests/base/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/base/test_platform.py b/tests/base/test_platform.py index 491a775a4..676f0b187 100644 --- a/tests/base/test_platform.py +++ b/tests/base/test_platform.py @@ -12,7 +12,7 @@ def test_url_constructor(): Test the URL constructor. ''' # test invalid URL - with pytest.raises(TypeError): + with pytest.raises(ConnectionError): APIPlatform(url='something') diff --git a/tests/dl/__init__.py b/tests/dl/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/ie/__init__.py b/tests/ie/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/ie/test_session.py b/tests/ie/test_session.py index fb8886708..e229cb64a 100644 --- a/tests/ie/test_session.py +++ b/tests/ie/test_session.py @@ -11,4 +11,4 @@ def test_session_authentication_error(): test to raise the exception unauthorized session is created ''' with pytest.warns(AuthenticationWarning): - TenableIE() + TenableIE(url='http://nourl') diff --git a/tests/ot/test_session.py b/tests/ot/test_session.py index a9633c0ad..5773847c1 100644 --- a/tests/ot/test_session.py +++ b/tests/ot/test_session.py @@ -1,8 +1,10 @@ +import os import pytest from tenable.errors import AuthenticationWarning from tenable.ot import TenableOT def test_session_auth(): + os.environ.pop('TOT_API_KEY', None) with pytest.warns(AuthenticationWarning): - t = TenableOT() + t = TenableOT(url='http://nourl')