Skip to content
This repository has been archived by the owner on Feb 6, 2023. It is now read-only.

Commit

Permalink
feat: Pull builtin checks from IP Fabric
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeff07 committed Dec 8, 2021
1 parent 1df418e commit 0f4d385
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 7 deletions.
37 changes: 37 additions & 0 deletions examples/tools/builtin_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from tabulate import tabulate

from ipfabric import IPFClient
from ipfabric.tools import SinglePointsFailure, NonRedundantLinks

if __name__ == "__main__":
# USE SPARINGLY THIS WILL CREATE A GRAPH OF ALL YOUR SITES.
# NOT TESTED ON AN IP FABRIC INSTANCE WITH MORE THAN 1,000 DEVICES
ipf = IPFClient('https://demo3.ipfabric.io')
nrl = NonRedundantLinks(ipf)
links = nrl.list()

print(tabulate(links, headers="keys"))
"""
source source_int target target_int
-------------------- ------------- ----------------- ------------
L71R3 Et1/1 L71FW10-HA2/root vl201
L67FW31 ge-0/0/3.0 L67P13 ge-0/0/4.0
L71-CP1/0 wrp0 L71EXR2 Et0/3
L71-CP1/BadCompany eth1 L71R4 Et1/1
"""
print()

spf = SinglePointsFailure(ipf)
nodes = spf.list()
print(tabulate(nodes, headers="keys"))
"""
device type
-------------------- ------------------
HWLAB router
HWLAB-sw3_5650TD switch
HWLAB-FW-RBSH1 waas
HWLAB-FW-C5510/admin fw
HWLAB-WLC-A620 wlc
HWLAB-FPR-1010 fw
HWLAB-WLC-C4400 wlc
"""
17 changes: 14 additions & 3 deletions ipfabric/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ def _query(self, payload: dict):
else:
return res.content

def site(self, site_name: Union[str, list], snapshot_id: Optional[str] = None):
def site(self, site_name: Union[str, list], snapshot_id: Optional[str] = None, overlay: dict = None):
"""
Returns a diagram for a site or sites
:param site_name: Union[str, list]: A single site name or a list of site names
:param snapshot_id: str: Optional Snapshot ID
:param overlay: dict: Optional Overlay dictionary
:return:
"""
payload = {
Expand All @@ -52,6 +53,8 @@ def site(self, site_name: Union[str, list], snapshot_id: Optional[str] = None):
},
"snapshot": snapshot_id or self.client.snapshot_id,
}
if overlay:
payload['overlay'] = overlay
return self._query(payload)

def unicast(
Expand All @@ -64,7 +67,8 @@ def unicast(
sec_drop: Optional[bool] = True,
grouping: Optional[str] = "siteName",
flags: Optional[list] = None,
snapshot_id: Optional[str] = None
snapshot_id: Optional[str] = None,
overlay: dict = None
) -> Union[dict, bytes]:
"""
Execute an Unicast Path Lookup diagram query for the given set of parameters.
Expand All @@ -79,6 +83,7 @@ def unicast(
:param flags: list: TCP flags, defaults to None. Must be a list and only allowed values can be
subset of ['ack', 'fin', 'psh', 'rst', 'syn', 'urg']
:param snapshot_id: str: Snapshot ID to override class default
:param overlay: dict: Optional Overlay dictionary
:return: Union[dict, str]: json contains a dictionary with 'graphResult' and 'pathlookup' primary keys.
If not json then return bytes
"""
Expand All @@ -98,6 +103,8 @@ def unicast(
parameters=self.check_proto(parameters, flags),
snapshot=snapshot_id or self.client.snapshot_id
)
if overlay:
payload['overlay'] = overlay

return self._query(payload)

Expand All @@ -112,7 +119,8 @@ def multicast(
sec_drop: Optional[bool] = True,
grouping: Optional[str] = "siteName",
flags: Optional[list] = None,
snapshot_id: Optional[str] = None
snapshot_id: Optional[str] = None,
overlay: dict = None
) -> Union[dict, bytes]:
"""
Execute an Multicast Path Lookup diagram query for the given set of parameters.
Expand All @@ -128,6 +136,7 @@ def multicast(
:param flags: list: TCP flags, defaults to None. Must be a list and only allowed values can be
subset of ['ack', 'fin', 'psh', 'rst', 'syn', 'urg']
:param snapshot_id: str: Snapshot ID to override class default
:param overlay: dict: Optional Overlay dictionary
:return: Union[dict, str]: json contains a dictionary with 'graphResult' and 'pathlookup' primary keys.
If not json then return bytes
"""
Expand All @@ -154,6 +163,8 @@ def multicast(
parameters=self.check_proto(parameters, flags),
snapshot=snapshot_id or self.client.snapshot_id
)
if overlay:
payload['overlay'] = overlay

return self._query(payload)

Expand Down
3 changes: 2 additions & 1 deletion ipfabric/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .builtin_checks import SinglePointsFailure, NonRedundantLinks
from .configuration import DeviceConfigs
from .sites import UpdateSiteNames
from .vulnerabilities import Vulnerabilities

__all__ = [DeviceConfigs, UpdateSiteNames, Vulnerabilities]
__all__ = [DeviceConfigs, UpdateSiteNames, Vulnerabilities, SinglePointsFailure, NonRedundantLinks]
55 changes: 55 additions & 0 deletions ipfabric/tools/builtin_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import Any

from pydantic.dataclasses import dataclass


def sites_names(ipf):
sites = [site['siteName'] for site in ipf.inventory.sites.all(columns=['siteName'])]
sites.append('$main')
return sites


@dataclass
class NonRedundantLinks:
ipf: Any

@staticmethod
def _get_int_names(labels):
source_int = None
for label in labels['source']:
if label['type'] == "intName":
source_int = label['text']
break
target_int = None
for label in labels['target']:
if label['type'] == "intName":
target_int = label['text']
break
return source_int, target_int

def list(self):
sites = sites_names(self.ipf)
data = self.ipf.graphs.site(sites, overlay={"type": "intent", "intentRuleId": "nonRedundantEdges"})
nodes = {node['id']: node['label'] for node in data['graphResult']['graphData']['nodes'].values()}
links = list()
for link in data['graphResult']['graphData']['edges'].values():
if link['intentCheckResult']:
source_int, target_int = self._get_int_names(link['labels'])
source = nodes[link['source']] if link['source'] in nodes else link['source']
target = nodes[link['target']] if link['target'] in nodes else link['target']
links.append(dict(source=source, source_int=source_int, target=target, target_int=target_int))
return links


@dataclass
class SinglePointsFailure:
ipf: Any

def list(self):
sites = sites_names(self.ipf)
data = self.ipf.graphs.site(sites, overlay={"type": "intent", "intentRuleId": "singlePointsOfFailure"})
nodes = list()
for node in data['graphResult']['graphData']['nodes'].values():
if 'intentCheckResult' in node and node['intentCheckResult']:
nodes.append(dict(device=node['label'], type=node['type']))
return nodes
6 changes: 3 additions & 3 deletions tests/test_graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ def test_query(self):
@patch('ipfabric.graphs.IPFPath._query')
def test_unicast(self, query, proto, subnets):
query.return_value = True
self.assertTrue(self.graph.unicast('ip', 'ip'))
self.assertTrue(self.graph.unicast('ip', 'ip', overlay=dict(test=1)))

@patch('ipfabric.graphs.IPFPath._query')
def test_site(self, query):
query.return_value = True
self.assertTrue(self.graph.site('ip'))
self.assertTrue(self.graph.site('ip', overlay=dict(test=1)))

@patch('ipfabric.graphs.IPFPath.check_subnets')
@patch('ipfabric.graphs.IPFPath.check_proto')
@patch('ipfabric.graphs.IPFPath._query')
def test_multicast(self, query, proto, subnets):
subnets.return_value = False
query.return_value = True
self.assertTrue(self.graph.multicast('ip', 'ip', rec_ip='ip'))
self.assertTrue(self.graph.multicast('ip', 'ip', rec_ip='ip', overlay=dict(test=1)))

@patch('ipfabric.graphs.IPFPath.check_proto')
def test_multicast_failed(self, proto):
Expand Down
78 changes: 78 additions & 0 deletions tests/tools/test_builtin_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import unittest
from unittest.mock import MagicMock

from ipfabric.tools.builtin_checks import NonRedundantLinks, SinglePointsFailure


class Checks(unittest.TestCase):
def setUp(self) -> None:
self.ipf = MagicMock()
self.ipf.inventory.sites.all.return_value = [{'siteName': 'Test'}]
self.ipf.graphs.site.return_value = {
"graphResult": {
"graphData": {
"edges": {
"39380579": {
"source": "69240849",
"target": "9A0MD0N6T4F",
"intentCheckResult": 30,
"labels": {
"source": [{
"type": "intName",
"text": "Et0/0"
}
],
"target": [{
"type": "intName",
"text": "GigabitEthernet0/1"
}
]
}
},
"39380580": {
"source": "SOURCE",
"target": "TARGET",
"intentCheckResult": 0,
"labels": {
"source": [{
"type": "intName",
"text": "Et0/0"
}
],
"target": [{
"type": "intName",
"text": "GigabitEthernet0/1"
}
]
}
}
},
"nodes": {
"69240849": {
"id": "69240849",
"label": "L1R13",
"type": "router",
"intentCheckResult": 30,
},
"9A0MD0N6T4F": {
"id": "9A0MD0N6T4F",
"label": "L1FW1",
"type": "fw"
}
}
}
}
}

def test_single_points(self):
spf = SinglePointsFailure(self.ipf)
devices = spf.list()
self.assertEqual(devices[0], {'device': 'L1R13', 'type': 'router'})

def test_non_redundant_links(self):
nrl = NonRedundantLinks(self.ipf)
links = nrl.list()
self.assertEqual(
links[0],
{'source': 'L1R13', 'source_int': 'Et0/0', 'target': 'L1FW1', 'target_int': 'GigabitEthernet0/1'}
)

0 comments on commit 0f4d385

Please sign in to comment.