From c6068028f972b00c57e24a63099f586f640eb0b2 Mon Sep 17 00:00:00 2001 From: Justin Jeffery Date: Fri, 3 Dec 2021 16:33:31 -0500 Subject: [PATCH] feat: Intent Checks --- examples/intent.py | 74 +++++++++++++++++++++++++++++++++++++++ ipfabric/client.py | 2 ++ ipfabric/intent.py | 62 ++++++++++++++++++++++++++++++++ ipfabric/intent_models.py | 47 +++++++++++++++++++++++++ 4 files changed, 185 insertions(+) create mode 100644 examples/intent.py create mode 100644 ipfabric/intent.py create mode 100644 ipfabric/intent_models.py diff --git a/examples/intent.py b/examples/intent.py new file mode 100644 index 0000000..9c80a8a --- /dev/null +++ b/examples/intent.py @@ -0,0 +1,74 @@ +""" +intent.py +""" +from pprint import pprint + +from ipfabric import IPFClient + +if __name__ == '__main__': + ipf = IPFClient('https://demo3.ipfabric.io/') + + # ipf.intent.get_intent_checks('$prev') Load a different snapshot then the client. + + pprint(ipf.intent.intent_checks[0].__dict__) + """ + {'api_endpoint': '/v1/tables/neighbors/unidirectional', + 'checks': Checks(green=None, blue=None, amber=None, red=None), + 'column': 'localHost', + 'custom': False, + 'default_color': 10, + 'descriptions': Description(general='Detects unidirectional Cisco Discovery Protocol (CDP) or + Link-Layer Discovery Protocol (LLDP) sessions.\n', + checks=Checks(green='', blue='Unidirectional CDP or LLDP sessions.', amber='', red='')), + 'groups': [Group(custom=True, name='Neighborship compliance', group_id='320668119', children=[])], + 'intent_id': '320633253', + 'name': 'CDP/LLDP unidirectional', + 'result': Result(count=21, checks=Checks(green=None, blue='21', amber=None, red=None)), + 'status': 1, + 'web_endpoint': '/technology/cdp-lldp/unidirectional-neighbors'} + """ + print() + + print(f"Number of custom checks {len(ipf.intent.custom)}") + """Number of custom checks 25""" + print() + + print(f"Number of builtin checks {len(ipf.intent.builtin)}") + """Number of builtin checks 132""" + print() + + print(f"{ipf.intent.intent_by_name['Console, VTY and AUX Security'].name} belongs to the following groups.") + for g in ipf.intent.intent_by_name['Console, VTY and AUX Security'].groups: + print(g.name) + """" + Console, VTY and AUX Security belongs to the following groups. + HIPAA SCORE + CIS Detail + """ + print() + + print(f"{ipf.intent.builtin[1].name} has a total of {ipf.intent.builtin[1].result.count} matches.") + """" + BGP Session Age has a total of 344 matches. + """ + print() + + results = ipf.intent.get_results(ipf.intent.builtin[2], 'amber') + # Takes in an intent rule and the color you want to view + # Colors are green (0), blue (10), amber (20), red (30) + print(f"{ipf.intent.builtin[2].name} has {len(results)} that are amber.") + print(f"Amber means {ipf.intent.builtin[2].descriptions.checks.amber}.") + print(results[0]) + """ + IPSec Tunnel Encryption has 3 that are amber. + Amber means IPSec tunnels with insecure/weak encryption algorithm. + + {'id': '1153299800', 'authentication': {'data': 'md5', 'severity': 20}, 'autoUp': None, 'dhGroup': None, + 'encapsulation': 'tunnel', 'encryption': {'data': '3des', 'severity': 20}, + 'hostname': 'L71FW5_hasAVeryLongHostname', 'ikeGateway': 'ipsec_L64_mtik', 'intDscr': None, 'intName': 'port4', + 'keepAlive': None, 'keyLifeBytes': None, 'keyLifeSeconds': 600, 'localGwAddress': '10.71.109.105', + 'neighbors': [], 'profileName': 'ipsec_L64_mtik_loop', 'protocol': 'esp', 'remoteGwAddress': '10.64.104.103', + 'routeBased': True, 'selectorLocalAddress': '10.71.200.1/32', 'selectorLocalPort': None, 'selectorProtocol': None, + 'selectorRemoteAddress': '10.64.200.1/32', 'selectorRemotePort': None, 'siteKey': '885963247', 'siteName': 'L71', + 'sn': 'FOSVM1QWZRUM4EB7', 'status': {'data': 'up', 'severity': 0}, 'tunnelIntName': 'ipsec_L64_mtik'} + """ \ No newline at end of file diff --git a/ipfabric/client.py b/ipfabric/client.py index 555ac79..e6c2c08 100644 --- a/ipfabric/client.py +++ b/ipfabric/client.py @@ -9,6 +9,7 @@ from ipfabric import models from ipfabric.graphs import IPFPath +from ipfabric.intent import Intent from ipfabric.security import Security DEFAULT_ID = '$last' @@ -64,6 +65,7 @@ def __init__( self.inventory = models.Inventory(client=self) self.graphs = IPFPath(self) self.security = Security(client=self) + self.intent = Intent(client=self) @property def snapshot_id(self): diff --git a/ipfabric/intent.py b/ipfabric/intent.py new file mode 100644 index 0000000..6f63b48 --- /dev/null +++ b/ipfabric/intent.py @@ -0,0 +1,62 @@ +from typing import Any +import logging +from typing import Any +from typing import Union + +from ipfabric.intent_models import Group +from .intent_models import IntentCheck + +logger = logging.getLogger() + + +class Intent: + def __init__(self, client): + self.client: Any = client + self.intent_checks: list = self.get_intent_checks() + self.groups: list = self.get_groups() + + def get_intent_checks(self): + """ + Gets all intent checks and returns a list of them. You can also: + ipf.intent() # Loads the intents to intent_checks + print(len(ipf.intent.intent_checks)) + :return: list: List of intent checks + """ + res = self.client.get('reports', params=dict(snapshot=self.client.snapshot_id)) + res.raise_for_status() + return [IntentCheck(**check) for check in res.json()] + + def get_groups(self): + res = self.client.get('reports/groups') + res.raise_for_status() + return [Group(**group) for group in res.json()] + + @property + def custom(self): + return [c for c in self.intent_checks if c.custom] + + @property + def builtin(self): + return [c for c in self.intent_checks if not c.custom] + + @property + def intent_by_id(self): + return {c.intent_id: c for c in self.intent_checks} + + @property + def intent_by_name(self): + return {c.name: c for c in self.intent_checks} + + @property + def group_by_id(self): + return {g.group_id: g for g in self.groups} + + @property + def group_by_name(self): + return {g.name: g for g in self.groups} + + def get_results(self, intent: IntentCheck, color: Union[str, int], snapshot_id: str = None): + if isinstance(color, str): + color = dict(green=0, blue=10, amber=20, red=30)[color] + return self.client.fetch_all(intent.api_endpoint, snapshot_id=snapshot_id, reports=intent.web_endpoint, + filters={intent.column: ['color', 'eq', color]}) diff --git a/ipfabric/intent_models.py b/ipfabric/intent_models.py new file mode 100644 index 0000000..9bb9abc --- /dev/null +++ b/ipfabric/intent_models.py @@ -0,0 +1,47 @@ +from typing import Union + +from pydantic import BaseModel, Field + + +class Checks(BaseModel): + green: Union[str, dict, int] = Field(alias='0', default=None) + blue: Union[str, dict, int] = Field(alias='10', default=None) + amber: Union[str, dict, int] = Field(alias='20', default=None) + red: Union[str, dict, int] = Field(alias='30', default=None) + + +class Description(BaseModel): + general: Union[None, str] + checks: Union[None, Checks] + + +class Result(BaseModel): + count: Union[int, None] + checks: Union[Checks, None] + + +class Child(BaseModel): + weight: int + intent_id: str = Field(alias='id') + + +class Group(BaseModel): + custom: bool + name: str + group_id: str = Field(alias='id') + children: list[Child] = Field(default_factory=list) + + +class IntentCheck(BaseModel): + groups: list[Group] + checks: Checks + column: str + custom: bool + descriptions: Description + name: str + status: int + result: Result + api_endpoint: str = Field(alias='apiEndpoint') + default_color: Union[None, int] = Field(alias='defaultColor') + web_endpoint: str = Field(alias='webEndpoint') + intent_id: str = Field(alias='id')