Skip to content
This repository has been archived by the owner on Feb 6, 2023. It is now read-only.

Commit

Permalink
feat: Added port syntax checking
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeff07 committed Jan 31, 2022
1 parent 652ed29 commit 7453b8e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 31 deletions.
74 changes: 43 additions & 31 deletions ipfabric/pathlookup/diagramsV4_3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import re
from typing import Optional, Union

from ipfabric.pathlookup.graphs import IPFPath
from ipfabric.pathlookup.icmp import ICMP

PORT_REGEX = re.compile(r'^\d*$|^\d*-\d*$')


class DiagramV43(IPFPath):
CONSTANT_PARAMS = dict(
Expand All @@ -21,19 +24,19 @@ def __init__(self, client):
super().__init__(client)

def unicast(
self,
src_ip: str,
dst_ip: str,
proto: str = "tcp",
src_port: Optional[Union[str, int]] = "1024-65535",
dst_port: Optional[Union[str, int]] = "80,443",
sec_drop: Optional[bool] = True,
flags: Optional[list] = None,
icmp: Optional[ICMP] = None,
ttl: Optional[int] = 128,
fragment_offset: Optional[int] = 0,
snapshot_id: Optional[str] = None,
overlay: dict = None,
self,
src_ip: str,
dst_ip: str,
proto: str = "tcp",
src_port: Optional[Union[str, int]] = "1024-65535",
dst_port: Optional[Union[str, int]] = "80,443",
sec_drop: Optional[bool] = True,
flags: Optional[list] = None,
icmp: Optional[ICMP] = None,
ttl: Optional[int] = 128,
fragment_offset: Optional[int] = 0,
snapshot_id: Optional[str] = None,
overlay: dict = None,
) -> Union[dict, bytes]:
"""
Execute an Unicast Path Lookup diagram query for the given set of parameters.
Expand Down Expand Up @@ -62,8 +65,8 @@ def unicast(
pathLookupType="unicast",
networkMode=self.check_subnets(src_ip, dst_ip),
l4Options=dict(
dstPorts=str(dst_port),
srcPorts=str(src_port)
dstPorts=self.check_ports(dst_port),
srcPorts=self.check_ports(src_port)
),
ttl=ttl,
fragmentOffset=fragment_offset,
Expand All @@ -79,20 +82,20 @@ def unicast(
return self._query(payload)

def multicast(
self,
src_ip: str,
grp_ip: str,
proto: str = "tcp",
rec_ip: Optional[str] = None,
src_port: Optional[Union[str, int]] = "1024-65535",
grp_port: Optional[Union[str, int]] = "80,443",
sec_drop: Optional[bool] = True,
flags: Optional[list] = None,
icmp: Optional[ICMP] = None,
ttl: Optional[int] = 128,
fragment_offset: Optional[int] = 0,
snapshot_id: Optional[str] = None,
overlay: dict = None,
self,
src_ip: str,
grp_ip: str,
proto: str = "tcp",
rec_ip: Optional[str] = None,
src_port: Optional[Union[str, int]] = "1024-65535",
grp_port: Optional[Union[str, int]] = "80,443",
sec_drop: Optional[bool] = True,
flags: Optional[list] = None,
icmp: Optional[ICMP] = None,
ttl: Optional[int] = 128,
fragment_offset: Optional[int] = 0,
snapshot_id: Optional[str] = None,
overlay: dict = None,
) -> Union[dict, bytes]:
"""
Execute an Multicast Path Lookup diagram query for the given set of parameters.
Expand Down Expand Up @@ -124,8 +127,8 @@ def multicast(
securedPath=sec_drop,
pathLookupType="multicast",
l4Options=dict(
dstPorts=str(grp_port),
srcPorts=str(src_port)
dstPorts=self.check_ports(grp_port),
srcPorts=self.check_ports(src_port)
),
ttl=ttl,
fragmentOffset=fragment_offset,
Expand Down Expand Up @@ -169,3 +172,12 @@ def check_proto(parameters, flags, icmp) -> dict:
parameters["l4Options"]["type"] = icmp.type
parameters["l4Options"]["code"] = icmp.code
return parameters

@staticmethod
def check_ports(ports: str):
port = ports.replace(' ', '').split(',')
for p in port:
if not PORT_REGEX.match(p):
raise SyntaxError(f"Ports {ports} is not in the valid syntax, "
f"examples: ['80', '80,443', '0-1024', '80,8000-8100,8443']")
return str(','.join(port))
8 changes: 8 additions & 0 deletions tests/unittests/pathlookup/test_diagramsV4_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,11 @@ def test_check_proto_icmp_failed(self):
params = dict(protocol='icmp')
with self.assertRaises(SyntaxError) as err:
self.graph.check_proto(params, flags=None, icmp=None)

def test_ports_failed(self):
with self.assertRaises(SyntaxError) as err:
self.graph.check_ports('hello')

def test_ports_format(self):
ports = self.graph.check_ports('80, 255-512, 30, 1025 - 2048')
self.assertEqual(ports, '80,255-512,30,1025-2048')

0 comments on commit 7453b8e

Please sign in to comment.