Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #77 from nautobot/release-v1.1.1
Browse files Browse the repository at this point in the history
Release v1.1.1
  • Loading branch information
pke11y authored Feb 10, 2022
2 parents df0e7d7 + 17c9450 commit f0bf1c6
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 52 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v1.1.1 - 2022-02-10

### Added

- #67 - Improved snapshot handling and added lock representation in snapshot select menu.
- #76 - Added case-insensitive search capability for hostnames.

## v1.1.0 - 2022-01-18

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ PLUGINS_CONFIG = {
"nautobot_chatops_ipfabric": {
"IPFABRIC_API_TOKEN": os.environ.get("IPFABRIC_API_TOKEN"),
"IPFABRIC_HOST": os.environ.get("IPFABRIC_HOST"),
"IPFABRIC_VERIFY": os.environ.get("IPFABRIC_VERIFY", True),
},
}
```
Expand All @@ -74,6 +75,7 @@ The plugin behavior can be controlled with the following list of settings

- `IPFABRIC_API_TOKEN`: Token for accessing IP Fabric API
- `IPFABRIC_HOST`: URL of IP Fabric instance
- `IPFABRIC_VERIFY`: Default: True; False to ignore self-signed certificates

## Development

Expand Down
3 changes: 2 additions & 1 deletion development/creds.example.env
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ NAUTOBOT_SUPERUSER_PASSWORD=admin

# IPFABRIC
IPFABRIC_API_TOKEN=foo
IPFABRIC_HOST=https://ipfabric.myserver.com
IPFABRIC_HOST=https://ipfabric.myserver.com
IPFABRIC_VERIFY=true
1 change: 1 addition & 0 deletions development/nautobot_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ def is_truthy(arg):
"nautobot_chatops_ipfabric": {
"IPFABRIC_API_TOKEN": os.environ.get("IPFABRIC_API_TOKEN"),
"IPFABRIC_HOST": os.environ.get("IPFABRIC_HOST"),
"IPFABRIC_VERIFY": is_truthy(os.environ.get("IPFABRIC_VERIFY", True)),
},
}

Expand Down
126 changes: 93 additions & 33 deletions nautobot_chatops_ipfabric/ipfabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,59 @@

import logging
import requests
from .ipfabric_models import Snapshot

# Default IP Fabric API pagination limit
DEFAULT_PAGE_LIMIT = 100
LAST = "$last"
PREV = "$prev"
LAST_LOCKED = "$lastLocked"

logger = logging.getLogger("rq.worker")


def create_regex(string: str) -> str:
"""Takes a string and returns a case insensitive regex."""
regex = "^"
for i in string.upper():
if i.isalpha():
regex += f"[{i}{i.lower()}]"
else:
regex += i
return regex + "$"


# pylint: disable=R0904
class IpFabric:
"""IpFabric will contain all the necessary API methods."""

EMPTY = "(empty)"

def __init__(self, host_url, token):
def __init__(self, host_url, token, verify=True):
"""Auth is contained in the 'X-API-Token' in the header."""
self.headers = {"Accept": "application/json", "Content-Type": "application/json", "X-API-Token": token}
self.host_url = host_url
self.verify = verify

def get_response(self, url, payload, method="POST"):
"""Get request and return response dict."""
return self.get_response_json(method, url, payload).get("data", {})

def get_response_json(self, method, url, payload, params=None):
"""Get request and return response dict."""
response = requests.request(method, self.host_url + url, json=payload, params=params, headers=self.headers)
response = requests.request(
method, self.host_url + url, json=payload, params=params, headers=self.headers, verify=self.verify
)
return response.json()

def get_response_raw(self, method, url, payload, params=None):
"""Get request and return response dict."""
headers = {**self.headers}
headers["Accept"] = "*/*"
return requests.request(method, self.host_url + url, json=payload, params=params, headers=headers)
headers = {**self.headers, "Accept": "*/*"}
return requests.request(
method, self.host_url + url, json=payload, params=params, headers=headers, verify=self.verify
)

def get_devices_info(self, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_devices_info(self, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Return Device info."""
logger.debug("Received device list request")

Expand All @@ -57,7 +77,7 @@ def get_os_version(self):
logger.debug("Your IP Fabric OS version is: %s", os_version)
return os_version

def get_device_inventory(self, search_key, search_value, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_device_inventory(self, search_key, search_value, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Return Device info."""
logger.debug("Received device inventory request")

Expand All @@ -81,14 +101,14 @@ def get_device_inventory(self, search_key, search_value, snapshot_id="$last", li
logger.debug("Requesting inventory with payload: %s", payload)
return self.get_response("/api/v1/tables/inventory/devices", payload)

def get_interfaces_load_info(self, device, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_interfaces_load_info(self, device, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Return Interface load info."""
logger.debug("Received interface counters request")

# columns and snapshot required
payload = {
"columns": ["intName", "inBytes", "outBytes"],
"filters": {"hostname": ["eq", device]},
"filters": {"hostname": ["reg", create_regex(device)]},
"pagination": {"limit": limit, "start": 0},
"snapshot": snapshot_id,
"sort": {"order": "desc", "column": "intName"},
Expand All @@ -101,20 +121,62 @@ def get_snapshots(self):
logger.debug("Received snapshot request")

# no payload required
payload = {}
return self.get_response_json("GET", "/api/v1/snapshots", payload)
response = self.get_response_json("GET", "/api/v1/snapshots", payload={})
snap_dict = {}
for snapshot in response:
if snapshot["state"] != "loaded":
continue
snap = Snapshot(**snapshot)
snap_dict[snap.snapshot_id] = snap
if LAST_LOCKED not in snap_dict and snap.locked:
snap.last_locked = True
snap_dict[LAST_LOCKED] = snap
if LAST not in snap_dict:
snap.last = True
snap_dict[LAST] = snap
continue
if PREV not in snap_dict:
snap.prev = True
snap_dict[PREV] = snap
return snap_dict

@property
def snapshots(self):
"""This gets all Snapshots, places them in Objects, and returns a dict {ID: Snapshot}."""
choices = [(LAST, LAST)]
named_snap_ids = set()
snapshots = self.get_snapshots()

if LAST in snapshots:
named_snap_ids.add(snapshots[LAST].snapshot_id)
choices[0] = (snapshots[LAST].description, snapshots[LAST].snapshot_id)
snapshots.pop(snapshots[LAST].snapshot_id, None)
snapshots.pop(LAST, None)
if PREV in snapshots:
choices.append((snapshots[PREV].description, snapshots[PREV].snapshot_id))
named_snap_ids.add(snapshots[PREV].snapshot_id)
snapshots.pop(snapshots[PREV].snapshot_id, None)
snapshots.pop(PREV, None)
if LAST_LOCKED in snapshots:
if snapshots[LAST_LOCKED].snapshot_id not in named_snap_ids:
choices.append((snapshots[LAST_LOCKED].description, snapshots[LAST_LOCKED].snapshot_id))
snapshots.pop(snapshots[LAST_LOCKED].snapshot_id, None)
snapshots.pop(LAST_LOCKED, None)

for snapshot_id, snapshot in snapshots.items():
choices.append((snapshot.description, snapshot_id))
return choices

def get_path_simulation(
self, src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id
): # pylint: disable=too-many-arguments
"""Return End to End Path Simulation."""
# end-to-end-path don't support $last as snapshot_id, getting the actual ID
if snapshot_id == "$last":
loaded_snapshots = [snap_id["id"] for snap_id in self.get_snapshots() if snap_id["state"] == "loaded"]
if not loaded_snapshots:
return []

snapshot_id = loaded_snapshots[-1]
loaded_snapshots = self.get_snapshots()
if snapshot_id not in loaded_snapshots:
logger.debug("Invalid snapshot_id: %s", snapshot_id)
return {}
snapshot_id = loaded_snapshots[snapshot_id].snapshot_id

params = {
"source": src_ip,
Expand Down Expand Up @@ -168,37 +230,37 @@ def get_pathlookup(
return None
return png_response.content

def get_interfaces_errors_info(self, device, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_interfaces_errors_info(self, device, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Return bi-directional interface errors info."""
logger.debug("Received interface error counters request")

# columns and snapshot required
payload = {
"columns": ["intName", "errPktsPct", "errRate"],
"filters": {"hostname": ["eq", device]},
"filters": {"hostname": ["reg", create_regex(device)]},
"pagination": {"limit": limit, "start": 0},
"snapshot": snapshot_id,
"sort": {"order": "desc", "column": "intName"},
}

return self.get_response("/api/v1/tables/interfaces/errors/bidirectional", payload)

def get_interfaces_drops_info(self, device, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_interfaces_drops_info(self, device, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Return interface drops info."""
logger.debug("Received interface drop counters request")

# columns and snapshot required
payload = {
"columns": ["intName", "dropsPktsPct", "dropsRate"],
"filters": {"hostname": ["eq", device]},
"filters": {"hostname": ["reg", create_regex(device)]},
"pagination": {"limit": limit, "start": 0},
"snapshot": snapshot_id,
"sort": {"order": "desc", "column": "intName"},
}

return self.get_response("/api/v1/tables/interfaces/drops/bidirectional", payload)

def get_bgp_neighbors(self, device, state, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_bgp_neighbors(self, device, state, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Retrieve BGP neighbors in IP Fabric for a specific device."""
logger.debug("Received BGP neighbor request")

Expand All @@ -216,16 +278,16 @@ def get_bgp_neighbors(self, device, state, snapshot_id="$last", limit=DEFAULT_PA
"totalReceivedPrefixes",
],
"snapshot": snapshot_id,
"filters": {"hostname": ["eq", device]},
"filters": {"hostname": ["reg", create_regex(device)]},
"pagination": {"limit": limit, "start": 0},
}

if state != "any":
payload["filters"] = {"and": [{"hostname": ["eq", device]}, {"state": ["eq", state]}]}
payload["filters"] = {"and": [{"hostname": ["reg", create_regex(device)]}, {"state": ["eq", state]}]}
return self.get_response("/api/v1/tables/routing/protocols/bgp/neighbors", payload)

def get_parsed_path_simulation(
self, src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id="$last"
self, src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id=LAST
): # pylint: disable=too-many-arguments, too-many-locals
"""Path Simulation from source to destination IP.
Expand Down Expand Up @@ -298,7 +360,7 @@ def get_parsed_path_simulation(
return path

def get_src_dst_endpoint(
self, src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id="$last"
self, src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id=LAST
): # pylint: disable=too-many-arguments, too-many-locals
"""Get the source/destination interface and source/destination node for the path.
Expand All @@ -315,9 +377,7 @@ def get_src_dst_endpoint(
"""
response = self.get_path_simulation(src_ip, dst_ip, src_port, dst_port, protocol, snapshot_id)
graph = response.get("graph", {})
endpoints = {}
endpoints["src"] = "Unknown"
endpoints["dst"] = "Unknown"
endpoints = {"src": "Unknown", "dst": "Unknown"}

# ipfabric returns the source of the path as the last element in the nodes list
for idx, node in enumerate(graph.get("nodes", [])[::-1]):
Expand All @@ -331,7 +391,7 @@ def get_src_dst_endpoint(
endpoints["dst"] = f"{dst_intf} -- {node.get('hostname')}"
return endpoints

def get_wireless_clients(self, ssid=None, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_wireless_clients(self, ssid=None, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Get details of wireless clients associated with access points."""
logger.debug("Received wireless client request")

Expand All @@ -357,7 +417,7 @@ def get_wireless_clients(self, ssid=None, snapshot_id="$last", limit=DEFAULT_PAG

return self.get_response("/api/v1/tables/wireless/clients", payload)

def get_wireless_ssids(self, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_wireless_ssids(self, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Get details of wireless SSIDs."""
logger.debug("Received wireless SSID request")

Expand All @@ -384,7 +444,7 @@ def validate_version(self, operator_func, version):
ipfabric_version = self.get_os_version()
return operator_func(ipfabric_version, version)

def get_host(self, search_key, search_value, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def get_host(self, search_key, search_value, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Return inventory host information."""
logger.debug("Received host inventory request - %s %s", search_key, search_value)

Expand All @@ -409,7 +469,7 @@ def get_host(self, search_key, search_value, snapshot_id="$last", limit=DEFAULT_
logger.debug("Requesting host inventory with payload: %s", payload)
return self.get_response("/api/v1/tables/addressing/hosts", payload)

def find_host(self, search_key, search_value, snapshot_id="$last", limit=DEFAULT_PAGE_LIMIT):
def find_host(self, search_key, search_value, snapshot_id=LAST, limit=DEFAULT_PAGE_LIMIT):
"""Get and parse inventory host information."""
logger.debug("Received host inventory request - %s %s", search_key, search_value)

Expand Down
41 changes: 41 additions & 0 deletions nautobot_chatops_ipfabric/ipfabric_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""IP Fabric data models."""

from datetime import datetime


class Snapshot:
"""IP Fabric Snapshot model."""

def __init__(self, **kwargs):
"""Initialize Snapshot class."""
self.name = kwargs.get("name", None)
self.snapshot_id = kwargs.get("id")
self.end = datetime.fromtimestamp(int(kwargs.get("tsEnd", 0) / 1000))
self.locked = kwargs.get("locked", False)
self.last = kwargs.get("last", False)
self.prev = kwargs.get("prev", False)
self.last_locked = kwargs.get("last_locked", False)

def __hash__(self):
"""Snapshot ID is unique so return it's hash."""
return hash(self.snapshot_id)

def __repr__(self):
"""Return Description to represent the class."""
return self.description

@property
def description(self):
"""Create a description for Slack menu."""
desc = "🔒 " if self.locked else ""
if self.last:
desc += "$last: "
elif self.prev:
desc += "$prev: "
elif self.last_locked:
desc += "$lastLocked: "
if self.name:
desc += self.name + " - " + self.end.ctime()
else:
desc += self.end.ctime() + " - " + self.snapshot_id
return desc
22 changes: 22 additions & 0 deletions nautobot_chatops_ipfabric/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Test IP Fabric models."""
import unittest

from nautobot_chatops_ipfabric import ipfabric_models


class TestIPFabricModels(unittest.TestCase):
"""Test Models of IP Fabric."""

def test_snapshot(self):
"""Verify the snapshot model works."""
snap_json = {
"name": None,
"state": "loaded",
"locked": False,
"tsEnd": 1642608948957,
"tsStart": 1642607756999,
"id": "1980e282-df63-4b09-b7fb-701a966040f3",
}
snap = ipfabric_models.Snapshot(**snap_json)
self.assertEqual(hash(snap), hash(snap_json["id"]))
self.assertIn(snap_json["id"], str(snap))
Loading

0 comments on commit f0bf1c6

Please sign in to comment.