Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uninett/jinja2 filters #167

Merged
merged 5 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/cnaas_nms/confpush/nornir_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from cnaas_nms.confpush.nornir_plugins.cnaas_inventory import CnaasInventory
from cnaas_nms.scheduler.jobresult import JobResult
from cnaas_nms.tools import jinja_filters


@dataclass
Expand All @@ -24,6 +25,10 @@ class NornirJobResult(JobResult):
lstrip_blocks=True,
keep_trailing_newline=True)

cnaas_jinja_env.filters['increment_ip'] = jinja_filters.increment_ip
cnaas_jinja_env.filters['isofy_ipv4'] = jinja_filters.isofy_ipv4
cnaas_jinja_env.filters['ipv4_to_ipv6'] = jinja_filters.ipv4_to_ipv6


def cnaas_init() -> Nornir:
InventoryPluginRegister.register("CnaasInventory", CnaasInventory)
Expand Down
81 changes: 81 additions & 0 deletions src/cnaas_nms/tools/jinja_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import ipaddress
import re


def increment_ip(ip_string, increment=1):
"""Increment an IP address by a given value. Default increment value is 1.
Args:
ip_string: IP address string. Can be plain or with numeric /prefix
increment: Optional increment step, defaults to 1
Returns:
String with the incremented IP address, with optional numeric prefix
"""
if '/' in ip_string:
# IP with prefix
interface = ipaddress.ip_interface(ip_string)
address = interface.ip + increment

# ugly workaround for IPv4: incrementing an interface's address changes the prefix in some
# cases.
# Check to ensure that the incremented address is in the original network.
if not address in interface.network:
raise ValueError(
f"IP address {address} is not in network {interface.network.with_prefixlen}"
)
else:
return f"{address}/{interface.network.prefixlen}"
else:
# plain IP
ip = ipaddress.ip_address(ip_string)
return format(ip + increment)


def isofy_ipv4(ip_string, prefix=''):
"""Transform IPv4 address so it can be used as an ISO/NET address.
All four blocks of the IP address are padded with zeros and split up into double octets.
Example: 10.255.255.1 -> 0102.5525.5001.00
With prefix: 10.255.255.1, 47.0023.0000.0001.0000 -> 47.0023.0000.0001.0000.0102.5525.5001.00
Args:
ip_string: a valid IPv4 address
prefix: first part of the ISO address (optional)
Returns:
ISO address
"""
ipaddress.IPv4Address(ip_string) # fails for invalid IP

if prefix != '':
prefix_valid = bool(re.match('^.{2}(\..{4})*?$', prefix))
if not prefix_valid:
raise ValueError(f"{prefix} cannot be used as ISO prefix, please check formatting")
prefix += '.'
# IP: split and fill with 0s
ip_parts = ip_string.split('.')
padded = [p.zfill(3) for p in ip_parts]
joined = ''.join(padded)
# IP: split to chunks à 4 chars
chunksize = 4
ip_chunks = [joined[i : i + chunksize] for i in range(0, len(joined), chunksize)]
# combine
iso_address = prefix + '.'.join(ip_chunks) + '.00'
return iso_address


def ipv4_to_ipv6(v6network_string, v4address):
"""Transform IPv4 address to IPv6. This will combine four hextets of an IPv6 network
with four pseudo-hextets of an IPv4 address into a valid IPv6 address.
Args:
v6network_string: IPv6 network in prefix notation
v4address: IPv4 address
Returns:
IPv6 address on the given network, in compressed notation
"""
v6network = ipaddress.IPv6Network(v6network_string)
part1 = v6network.network_address.compressed
part3 = v6network.prefixlen
part2 = v4address.replace('.', ':')
v6address_string = f"{part1}{part2}/{part3}"

v6address = ipaddress.IPv6Interface(v6address_string)
assert v6address in v6network # verify that address is on the given network

return v6address.compressed
37 changes: 30 additions & 7 deletions src/cnaas_nms/tools/template_dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sys
import os
import argparse
try:
import requests
import jinja2
Expand All @@ -16,7 +17,7 @@

api_url = os.environ['CNAASURL']
headers = {"Authorization": "Bearer "+os.environ['JWT_AUTH_TOKEN']}

verify_tls = True

def get_entrypoint(platform, device_type):
mapfile = os.path.join(platform, 'mapping.yml')
Expand All @@ -31,13 +32,15 @@ def get_entrypoint(platform, device_type):
def get_device_details(hostname):
r = requests.get(
f"{api_url}/api/v1.0/device/{hostname}",
verify=verify_tls,
headers=headers)
if r.status_code != 200:
raise Exception("Could not query device API")
device_data = r.json()['data']['devices'][0]

r = requests.get(
f"{api_url}/api/v1.0/device/{hostname}/generate_config",
verify=verify_tls,
headers=headers)
if r.status_code != 200:
raise Exception("Could not query generate_config API")
Expand All @@ -47,6 +50,17 @@ def get_device_details(hostname):
config_data['available_variables'], config_data['generated_config']


def load_jinja_filters():
try:
import jinja_filters
jf = jinja_filters.__dict__
functions = {f: jf[f] for f in jf if callable(jf[f])}
return functions
except ModuleNotFoundError:
print('No jinja_filters.py file in PYTHONPATH, proceeding without filters')
return {}


def render_template(platform, device_type, variables):
# Jinja env should match nornir_helper.cnaas_ninja_env
jinjaenv = jinja2.Environment(
Expand All @@ -56,6 +70,10 @@ def render_template(platform, device_type, variables):
lstrip_blocks=True,
keep_trailing_newline=True
)
jfilters = load_jinja_filters()
for f in jfilters:
jinjaenv.filters[f] = jfilters[f]
print("Jinja filters added: {}".format([*jfilters]))
template_secrets = {}
for env in os.environ:
if env.startswith('TEMPLATE_SECRET_'):
Expand All @@ -73,6 +91,7 @@ def schedule_apply_dryrun(hostname, config):
r = requests.post(
f"{api_url}/api/v1.0/device/{hostname}/apply_config",
headers=headers,
verify=verify_tls,
json=data
)
if r.status_code != 200:
Expand All @@ -81,11 +100,15 @@ def schedule_apply_dryrun(hostname, config):


def main():
if len(sys.argv) != 2:
print("Usage: template_dry_run.py <hostname>")
sys.exit(1)

hostname = sys.argv[1]
parser = argparse.ArgumentParser()
parser.add_argument("hostname")
parser.add_argument("-k", "--skip-verify", help="skip TLS cert verification", action="store_true")
args = parser.parse_args()

hostname = args.hostname
if args.skip_verify:
global verify_tls
verify_tls = False
try:
device_type, platform, variables, old_config = get_device_details(hostname)
except Exception as e:
Expand All @@ -99,7 +122,7 @@ def main():
print(new_config)

try:
input("Start apply_config dry run? Ctrl-c to abort or enter to continue...")
input("Start apply_config dry run? Ctrl-C to abort or enter to continue...")
except KeyboardInterrupt:
print("Exiting...")
else:
Expand Down
14 changes: 0 additions & 14 deletions src/cnaas_nms/tools/testdb.py

This file was deleted.

83 changes: 83 additions & 0 deletions src/cnaas_nms/tools/tests/test_jinja_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import unittest

from cnaas_nms.tools.jinja_filters import increment_ip, isofy_ipv4, ipv4_to_ipv6


class JinjaFilterTests(unittest.TestCase):
def test_increment_ipv4_plain(self):
self.assertEqual(increment_ip('10.0.0.1'), '10.0.0.2')
self.assertEqual(increment_ip(increment_ip('10.0.0.1')), '10.0.0.3')
self.assertEqual(increment_ip('10.0.0.3', 4), '10.0.0.7')
self.assertEqual(increment_ip('10.0.0.1', 255), '10.0.1.0')
self.assertEqual(increment_ip('10.0.0.2', -1), '10.0.0.1')

def test_increment_ipv4_prefix(self):
self.assertEqual(increment_ip('10.0.0.1/24'), '10.0.0.2/24')
self.assertNotEqual(increment_ip('10.0.0.1/24', 1), '10.0.0.2/32')
self.assertEqual(increment_ip(increment_ip('10.0.0.1/24')), '10.0.0.3/24')
self.assertEqual(increment_ip('10.0.0.3/24', 4), '10.0.0.7/24')
with self.assertRaises(ValueError):
increment_ip('10.0.0.1/24', 255)
self.assertEqual(increment_ip('10.0.0.2/24', -1), '10.0.0.1/24')
self.assertEqual(increment_ip('10.0.0.1/16', 255), '10.0.1.0/16')

def test_increment_ipv6_plain(self):
self.assertEqual(increment_ip('2001:700:3901:0020::1'), '2001:700:3901:20::2')
self.assertEqual(increment_ip('2001:700:3901:0020::9'), '2001:700:3901:20::a')
self.assertEqual(
increment_ip('2001:700:3901:0020::1', -2), '2001:700:3901:1f:ffff:ffff:ffff:ffff'
)

def test_increment_ipv6_prefix(self):
self.assertEqual(increment_ip('2001:700:3901:0020::1/64'), '2001:700:3901:20::2/64')
self.assertEqual(increment_ip('2001:700:3901:0020::9/64'), '2001:700:3901:20::a/64')
with self.assertRaises(ValueError):
increment_ip('2001:700:3901:0020::1/64', -2)

def test_isofy_ipv4(self):
self.assertEqual(isofy_ipv4('10.255.255.1'), '0102.5525.5001.00')
self.assertEqual(isofy_ipv4('130.242.1.28'), '1302.4200.1028.00')
self.assertEqual(isofy_ipv4('10.0.0.1'), '0100.0000.0001.00')
with self.assertRaises(ValueError):
isofy_ipv4('10.256.255.1')

def test_isofy_ipv4_prefix(self):
self.assertEqual(
isofy_ipv4('130.242.1.28', prefix='47.0023.0000.0001.0000'),
'47.0023.0000.0001.0000.1302.4200.1028.00',
)
self.assertEqual(
isofy_ipv4('130.242.1.28', prefix='47.0023.0000.0001'),
'47.0023.0000.0001.1302.4200.1028.00',
)
self.assertEqual(isofy_ipv4('130.242.1.28', '47'), '47.1302.4200.1028.00')
invalid_prefixes = [
'47.0023.0000.0001.00',
'47.0023.0000.0001.000',
'47.0023.0000.0001.0000.',
'0047.0023.0000.0001.0000',
]
for prefix in invalid_prefixes:
with self.assertRaises(ValueError):
isofy_ipv4('10.0.0.1', prefix=prefix)

def test_ipv4_to_ipv6(self):
self.assertEqual(ipv4_to_ipv6('2001:700::/64', '10.0.0.1'), '2001:700::10:0:0:1/64')
self.assertEqual(ipv4_to_ipv6('2001:700:0::/64', '10.0.0.1'), '2001:700::10:0:0:1/64')
with self.assertRaises(ValueError):
invalid_network = '2001:700:0:::/64'
ipv4_to_ipv6(invalid_network, '10.0.0.1')

def test_ipv4to6_prefix(self):
self.assertNotEqual(ipv4_to_ipv6('2001:700::/64', '10.0.0.1'), '2001:700::10:0:0:1')

def test_ipv4to6_compressed_notation(self):
self.assertNotEqual(ipv4_to_ipv6('2001:700:0::/64', '10.0.0.1'), '2001:700:0::10:0:0:1/64')
self.assertNotEqual(
ipv4_to_ipv6('2001:0700:0000::/64', '10.0.0.1'), '2001:0700:0000::10:0:0:1/64'
)
self.assertNotEqual(ipv4_to_ipv6('2001:700::/64', '10.00.0.1'), '2001:700::10:00:0:1/64')


if __name__ == '__main__':
unittest.main()