-
Notifications
You must be signed in to change notification settings - Fork 219
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feat(plugins): Add arista.avd.range_expand filter (#1586)
- Loading branch information
1 parent
ea243c8
commit e70e6e1
Showing
3 changed files
with
225 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
ansible_collections/arista/avd/plugins/filter/range_expand.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
# | ||
# range_expand filter | ||
# | ||
from __future__ import (absolute_import, division, print_function) | ||
__metaclass__ = type | ||
|
||
from itertools import count, groupby | ||
import re | ||
from ansible.errors import AnsibleFilterError | ||
|
||
|
||
class FilterModule(object): | ||
|
||
def range_expand(self, range_to_expand): | ||
if not (isinstance(range_to_expand, list) or isinstance(range_to_expand, str)): | ||
raise AnsibleFilterError(f"value must be of type list or str, got {type(range_to_expand)}") | ||
|
||
result = [] | ||
|
||
# If we got a list, unpack it and run this function recursively | ||
if isinstance(range_to_expand, list): | ||
for r in range_to_expand: | ||
result.extend(self.range_expand(r)) | ||
|
||
# Must be a str now | ||
else: | ||
prefix = "" | ||
|
||
# Unpack list in string | ||
for one_range in range_to_expand.split(','): | ||
|
||
# Find prefix (if any) | ||
regex = r"^(.*?)(((\d+)-)?(\d+)\/)?(((\d+)-)?(\d+)\/)?(((\d+)-)?(\d+))(\.((\d+)-)?(\d+))?" | ||
# Groups one-by-one: | ||
# Group 1 (.*?) matches prefix ex. Ethernet, Eth, Po, Port-Channel | ||
# Group 2 (((\d+)-)?(\d+)\/)? matches module(s) and slash ex. 12/, 1-3/ | ||
# Group 3 ((\d+)-)? matches first module and dash ex. 1- | ||
# Group 4 (\d+) matches first module ex. 1 | ||
# Group 5 (\d+) matches last module ex. 12, 3 | ||
# Group 6 (((\d+)-)?(\d+)\/)? matches parent interface(s) and slash ex. 47/, 1-48/ | ||
# Group 7 ((\d+)-)? matches parent interface(s) and dash ex. 47- | ||
# Group 8 (\d+) matches first parent interface ex. 1 | ||
# Group 9 (\d+) matches last parent interface ex. 47, 48 | ||
# Group 10 (((\d+)-)?(\d+)) matches (breakout) interface(s) ex. 1, 1-4, 1-48 | ||
# Group 11 ((\d+)-)? matches first interfaces and dash ex. 1-, 1- | ||
# Group 12 (\d+) matches first interface | ||
# Group 13 (\d+) matches last interface ex. 1, 4, 48 | ||
# Group 14 (\.((\d+)-)?(\d+))? matches dot and sub-interface(s) ex. .141, .12-15 | ||
# Group 15 ((\d+)-)? matches first sub-interface and dash ex. 12- | ||
# Group 16 (\d+) matches first sub-interface ex. 12 | ||
# Group 17 (\d+) matches last sub-interface ex. 141, 15 | ||
# Remember that the groups() object is 0-based and the group numbers above are 1-based | ||
search_result = re.search(regex, one_range) | ||
if search_result and len(search_result.groups()) == 17: | ||
groups = search_result.groups() | ||
first_module = last_module = None | ||
first_parent_interface = last_parent_interface = None | ||
first_interface = last_interface = None | ||
first_subinterface = last_subinterface = None | ||
# Set prefix if found (otherwise use last set prefix) | ||
if groups[0]: | ||
prefix = groups[0] | ||
if groups[4]: | ||
last_module = int(groups[4]) | ||
if groups[3]: | ||
first_module = int(groups[3]) | ||
else: | ||
first_module = last_module | ||
if groups[8]: | ||
last_parent_interface = int(groups[8]) | ||
if groups[7]: | ||
first_parent_interface = int(groups[7]) | ||
else: | ||
first_parent_interface = last_parent_interface | ||
if groups[12]: | ||
last_interface = int(groups[12]) | ||
if groups[11]: | ||
first_interface = int(groups[11]) | ||
else: | ||
first_interface = last_interface | ||
if groups[16]: | ||
last_subinterface = int(groups[16]) | ||
if groups[15]: | ||
first_subinterface = int(groups[15]) | ||
else: | ||
first_subinterface = last_subinterface | ||
|
||
def expand_subinterfaces(interface_string): | ||
result = [] | ||
if last_subinterface: | ||
for subinterface in range(first_subinterface, last_subinterface + 1): | ||
result.append(f"{interface_string}.{subinterface}") | ||
else: | ||
result.append(interface_string) | ||
return result | ||
|
||
def expand_interfaces(interface_string): | ||
result = [] | ||
for interface in range(first_interface, last_interface + 1): | ||
for res in expand_subinterfaces(f"{interface_string}{interface}"): | ||
result.append(res) | ||
return result | ||
|
||
def expand_parent_interfaces(interface_string): | ||
result = [] | ||
if last_parent_interface: | ||
for parent_interface in range(first_parent_interface, last_parent_interface + 1): | ||
for res in expand_interfaces(f"{interface_string}{parent_interface}/"): | ||
result.append(res) | ||
else: | ||
for res in expand_interfaces(f"{interface_string}"): | ||
result.append(res) | ||
return result | ||
|
||
def expand_module(interface_string): | ||
result = [] | ||
if last_module: | ||
for module in range(first_module, last_module + 1): | ||
for res in expand_parent_interfaces(f"{interface_string}{module}/"): | ||
result.append(res) | ||
else: | ||
for res in expand_parent_interfaces(f"{interface_string}"): | ||
result.append(res) | ||
return result | ||
|
||
result.extend(expand_module(prefix)) | ||
|
||
else: | ||
raise AnsibleFilterError(f"Invalid range, got {one_range} and found {search_result.groups()}") | ||
return result | ||
|
||
def filters(self): | ||
return { | ||
'range_expand': self.range_expand, | ||
} |
59 changes: 59 additions & 0 deletions
59
ansible_collections/arista/avd/tests/unit/filters/test_range_expand.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from __future__ import (absolute_import, division, print_function) | ||
__metaclass__ = type | ||
|
||
from ansible_collections.arista.avd.plugins.filter.range_expand import FilterModule, AnsibleFilterError | ||
import pytest | ||
|
||
RANGE_TO_EXPAND_INVALID_VALUES = ["1-3", {"key": "value"}, 33] | ||
RANGE_TO_EXPAND_VALID_VALUES = [ | ||
"Ethernet1", | ||
"Ethernet1-2", | ||
"Eth 3-5,7-8", | ||
"et2-6,po1-2", | ||
["Ethernet1"], | ||
["Ethernet 1-2", "Eth3-5", "7-8"], | ||
["Ethernet2-6", "Port-channel1-2"], | ||
["Ethernet1/1-2", "Eth1-2/3-5,5/1-2"], | ||
["Eth1.1,9-10.1", "Eth2.2-3", "Eth3/1-2.3-4"], | ||
"1-3", | ||
["1", "2", "3"], | ||
"vlan1-3", | ||
"Et1-2/3-4/5-6", | ||
] | ||
|
||
EXPECTED_RESULT_VALID_VALUES = [ | ||
["Ethernet1"], | ||
["Ethernet1", "Ethernet2"], | ||
["Eth 3", "Eth 4", "Eth 5", "Eth 7", "Eth 8"], | ||
["et2", "et3", "et4", "et5", "et6", "po1", "po2"], | ||
["Ethernet1"], | ||
["Ethernet 1", "Ethernet 2", "Eth3", "Eth4", "Eth5", "7", "8"], | ||
["Ethernet2", "Ethernet3", "Ethernet4", "Ethernet5", "Ethernet6", "Port-channel1", "Port-channel2"], | ||
["Ethernet1/1", "Ethernet1/2", "Eth1/3", "Eth1/4", "Eth1/5", "Eth2/3", "Eth2/4", "Eth2/5", "Eth5/1", "Eth5/2"], | ||
["Eth1.1", "Eth9.1", "Eth10.1", "Eth2.2", "Eth2.3", "Eth3/1.3", "Eth3/1.4", "Eth3/2.3", "Eth3/2.4"], | ||
["1", "2", "3"], | ||
["1", "2", "3"], | ||
["vlan1", "vlan2", "vlan3"], | ||
["Et1/3/5", "Et1/3/6", "Et1/4/5", "Et1/4/6", "Et2/3/5", "Et2/3/6", "Et2/4/5", "Et2/4/6"], | ||
] | ||
|
||
f = FilterModule() | ||
|
||
|
||
class TestListCompressFilter(): | ||
@pytest.mark.parametrize("RANGE_TO_EXPAND_INVALID", RANGE_TO_EXPAND_INVALID_VALUES) | ||
def test_range_expand(self, RANGE_TO_EXPAND_INVALID): | ||
with pytest.raises(AnsibleFilterError) as exc_info: | ||
f.range_expand(RANGE_TO_EXPAND_INVALID) | ||
assert str( | ||
exc_info.value) == f"value must be of type list or str, got {type(RANGE_TO_EXPAND_INVALID)}" | ||
|
||
@pytest.mark.parametrize("RANGE_TO_EXPAND_VALID", RANGE_TO_EXPAND_VALID_VALUES) | ||
def test_range_expand(self, RANGE_TO_EXPAND_VALID): | ||
resp = f.range_expand(RANGE_TO_EXPAND_VALID) | ||
assert resp in EXPECTED_RESULT_VALID_VALUES | ||
|
||
def test_range_expand_filter(self): | ||
resp = f.filters() | ||
assert isinstance(resp, dict) | ||
assert 'range_expand' in resp.keys() |