Skip to content

Commit

Permalink
Initial ASM support & Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveMcGrath committed Sep 17, 2024
1 parent d2cc88f commit 8f310af
Show file tree
Hide file tree
Showing 20 changed files with 404 additions and 11 deletions.
1 change: 1 addition & 0 deletions docs/api/asm/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.asm.session
1 change: 1 addition & 0 deletions docs/api/asm/inventory.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.asm.inventory
1 change: 1 addition & 0 deletions docs/api/asm/smart_folders.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.asm.smart_folders
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
api/dl/index
api/ie/index
api/apa/index
api/asm/index
api/nessus/index
api/reports/index
api/base/index
Expand Down
1 change: 1 addition & 0 deletions tenable/asm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .session import TenableASM
112 changes: 112 additions & 0 deletions tenable/asm/inventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Inventory
=========
Methods described in this section relate to the inventory API and can be accessed at
``TenableASM.inventory``.
.. rst-class:: hide-signature
.. autoclass:: InventoryAPI
:members:
.. autoclass:: InventoryIterator
:members:
"""
from typing import Dict, List, Any, Optional, Tuple, TYPE_CHECKING
from copy import copy
from tenable.base.endpoint import APIEndpoint
from restfly.iterator import APIIterator

if TYPE_CHECKING:
from .session import TenableASM
from box import BoxList


class InventoryIterator(APIIterator):
"""
Asset inventory iterator
"""
_after_asset_id: str = '0000000000'
_filters: List[Dict[str, str]]
_query: Dict[str, Any]
_api: 'TenableASM'
page: 'BoxList'
limit: int = 1000
total: int
stats: Dict[str, Any]

def _get_page(self):
query = copy(self._query)
if not query.get('after'):
query['after'] = self._after_asset_id

query['limit'] = self.limit
resp = self._api.post('inventory', params=query, json=self._filters)
self.page = resp.assets
self.total = resp.total
self.stats = resp.stats

if self.page:
self._after_asset_id = self.page[-1].id



class InventoryAPI(APIEndpoint):
def list(self,
*search: Tuple[str, str, str],
columns: Optional[List[str]] = None,
size: int = 1000,
sort_field: Optional[str] = None,
sort_asc: bool = True,
inventory: bool = False,
) -> InventoryIterator:
"""
Lists the assets in the inventory
Args:
*search (tuple[str, str, str], optional):
A 3-part search tuple detailing what to search for from the ASM
dataset. For example:
``('bd.original_hostname', 'ends with', '.com')``
columns (list[str], optional):
The list of columns to return in the response.
size (int, optional):
The number of records to return with each page from the API. Must be
an integer between `1` and `10000`.
sort_field (str, optional):
What field should the results be worted by?
sort_asc (bool):
How should the results be sorted? ``True`` specifies ascending sort,
whereas ``False`` refers to descending.
Example:
>>> for item in asm.inventory.list():
... print(item)
"""
if not columns:
columns = [
'bd.original_hostname',
'bd.severity_ranking',
'bd.hostname',
'bd.record_type',
'bd.ip_address',
'id',
'bd.addedtoportfolio',
'bd.smartfolders',
'bd.app_updates',
'ports.ports',
'screenshot.redirect_chain',
'screenshot.finalurl',
'ports.cves',
]
return InventoryIterator(
self._api,
_query={
'columns': ','.join(columns),
'inventory': str(bool(inventory)).lower(),
'sortorder': str(bool(sort_asc)).lower(),
'sortby': sort_field,
},
_filters = [{'column': c, 'type': t, 'value': v} for c, t, v in search],
limit=size
)
112 changes: 112 additions & 0 deletions tenable/asm/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Tenable Attack Surface Management
=================================
This package covers the Tenable ASM application.
.. autoclass:: TenableASM
:members:
.. toctree::
:hidden:
:glob:
inventory
smart_folders
"""
from tenable.base.platform import APIPlatform
from .inventory import InventoryAPI
from .smart_folders import SmartFoldersAPI


class TenableASM(APIPlatform):
"""
The TenableASM class is the primary interaction point for users to interface with
Tenable Attack Surface Management via the pyTenable library. All the API endpoint
classes that wrap the various aspects of ASM will be attached to this base class.
Args:
api_key (str, optional):
The user's API key to interface into Tenable ASM. If the key isn't
specified, then the library will attempt to read the environment
variable ``TASM_API_KEY`` to get the key.
url (str, optional):
The base URL that the paths will be appended onto. If the url isn't
specified, then the library will attempt to read the environment variable
``TASM_URL``.
retries (int, optional):
The number of retries to make before failing a request. The
default is ``5``.
backoff (float, optional):
If a 429 response is returned, how much do we want to backoff
if the response didn't send a Retry-After header. The default
backoff is ``1`` second.
vendor (str, optional):
The vendor name for the User-Agent string.
product (str, optional):
The product name for the User-Agent string.
build (str, optional):
The version or build identifier for the User-Agent string.
timeout (int, optional):
The connection timeout parameter informing the library how long to
wait in seconds for a stalled response before terminating the
connection. If unspecified, the default is 120 seconds.
Examples:
Basic example:
>>> from tenable.asm import TenableASM
>>> tasm = TenableASM(url='https://asm.cloud.tenable.com',
... api_key='abcdef1234567890'
... )
Another example with proper identification:
>>> tasm = TenableASM(url='https://asm.cloud.tenable.com',
... api_key='abcdef1234567890',
... vendor='Company Name',
... product='My Awesome Widget',
... build='1.0.0'
... )
Yet another example thats leveraging the ``TASM_API_KEY`` and
``TASM_URL`` environment variables:
>>> os.environ['TASM_URL'] = 'https://asm.cloud.tenable.com'
>>> os.environ['TASM_API_KEY'] = 'abcdef1234567890'
>>> tasm = TenableASM(vendor='Company Name',
... product='My Awesome Widget',
... build='1.0.0'
... )
"""
_base_path = 'api/1.0'
_env_base = 'TASM'
_box = True
_allowed_auth_mech_priority = ['key']
_allowed_auth_mech_params = {'key': ['api_key']}

def _key_auth(self, api_key, **kwargs):
"""
API Key authorization mechanism for Tenable ASM.
"""
self._session.headers.update({'Authorization': api_key})
self._auth_meth = 'key'

@property
def inventory(self):
"""
The interface object for the
:doc:`Tenable ASM Inventory API <inventory>`
"""
return InventoryAPI(self)

@property
def smart_folders(self):
"""
The interface object for the
:doc:`Tenable ASM Smart Folders API <smart_folders>`
"""
return SmartFoldersAPI(self)
26 changes: 26 additions & 0 deletions tenable/asm/smart_folders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Smart Folders
=============
Methods described in this section relate to the smart folders API and can be accessed at
``TenableASM.smart_folders``.
.. rst-class:: hide-signature
.. autoclass:: SmartFoldersAPI
:members:
"""
from typing import Dict, List, Any
from tenable.base.endpoint import APIEndpoint


class SmartFoldersAPI(APIEndpoint):
_path = 'smartfolders'

def list(self) -> List[Dict[str, Any]]:
"""
Returns the list of smart folders from ASM.
Example:
>>> folders = asm.smartfolders.list()
"""
return self._get()
16 changes: 8 additions & 8 deletions tenable/base/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ class APIPlatform(Base):

def __init__(self, **kwargs):

# if the constructed URL isn't valid, then we will throw a TypeError
# if the constructed URL isn't valid, then we will throw a ConnectionError
# to inform the caller that something isn't right here.
self._url = kwargs.get('url',
os.environ.get(f'{self._env_base}_URL',
self._url
)
)
if not url_validator(self._url):
raise TypeError(f'{self._url} is not a valid URL')
url = kwargs.get('url')
if not url:
url = os.environ.get(f'{self._env_base}_URL', self._url)
self._url = url

if not (self._url and url_validator(self._url, validate=['scheme', 'netloc'])):
raise ConnectionError(f'{self._url} is not a valid URL')

# CamelCase squashing is an optional parameter thanks to Box. if the
# user has requested it, then we should add the appropriate parameter
Expand Down
Empty file added tests/apa/__init__.py
Empty file.
Empty file added tests/asm/__init__.py
Empty file.
70 changes: 70 additions & 0 deletions tests/asm/test_inventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest
import responses
from responses.registries import OrderedRegistry
from responses.matchers import json_params_matcher, query_param_matcher
from tenable.asm import TenableASM


@responses.activate(registry=OrderedRegistry)
def test_asm_inventory_list():
test_item = {'id': 123456}
responses.post(
'https://nourl/api/1.0/inventory',
json={'assets': [test_item for _ in range(1000)], 'total': 2005, 'stats': {}},
match=[
query_param_matcher({
'columns': 'id,name',
'inventory': 'false',
'sortorder': 'true',
'sortby': 'id',
'after': '0000000000',
'limit': 1000,
}),
json_params_matcher([
{'column': 'id', 'type': 'equals', 'value': 'something'}
])
]
)
responses.post(
'https://nourl/api/1.0/inventory',
json={'assets': [test_item for _ in range(1000)], 'total': 2005, 'stats': {}},
match=[
query_param_matcher({
'columns': 'id,name',
'inventory': 'false',
'sortorder': 'true',
'sortby': 'id',
'after': '123456',
'limit': 1000,
}),
json_params_matcher([
{'column': 'id', 'type': 'equals', 'value': 'something'}
])
]
)
responses.post(
'https://nourl/api/1.0/inventory',
json={'assets': [test_item for _ in range(5)], 'total': 2005, 'stats': {}},
match=[
query_param_matcher({
'columns': 'id,name',
'inventory': 'false',
'sortorder': 'true',
'sortby': 'id',
'after': '123456',
'limit': 1000,
}),
json_params_matcher([
{'column': 'id', 'type': 'equals', 'value': 'something'}
])
]
)
asm = TenableASM(url='https://nourl', api_key='12345')
items = asm.inventory.list(
('id', 'equals', 'something'),
columns=['id', 'name'],
sort_field='id'
)
for item in items:
assert dict(item) == test_item
assert items.count == 2005
17 changes: 17 additions & 0 deletions tests/asm/test_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
import pytest
from tenable.asm import TenableASM
from tenable.errors import AuthenticationWarning


def test_asm_session_authentication():
asm = TenableASM(url='http://nourl', api_key='abcdef')
assert asm._session.headers['Authorization'] == 'abcdef'

os.environ['TASM_API_KEY'] = 'efghi'
asm = TenableASM(url='http://nourl')
assert asm._session.headers['Authorization'] == 'efghi'

os.environ.pop('TASM_API_KEY')
with pytest.warns(AuthenticationWarning):
asm = TenableASM(url='http://nourl')
Loading

0 comments on commit 8f310af

Please sign in to comment.