Skip to content

Commit

Permalink
✨ decode: add raise_on_limit_exceeded option (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
techouse authored Nov 23, 2024
1 parent fb6c6a4 commit 465bd93
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 22 deletions.
54 changes: 48 additions & 6 deletions src/qs_codec/decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ def _interpret_numeric_entities(value: str) -> str:
return re.sub(r"&#(\d+);", lambda match: chr(int(match.group(1))), value)


def _parse_array_value(value: t.Any, options: DecodeOptions) -> t.Any:
def _parse_array_value(value: t.Any, options: DecodeOptions, current_list_length: int) -> t.Any:
if isinstance(value, str) and value and options.comma and "," in value:
return value.split(",")
split_val: t.List[str] = value.split(",")
if options.raise_on_limit_exceeded and len(split_val) > options.list_limit:
raise ValueError(
f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list."
)
return split_val

if options.raise_on_limit_exceeded and current_list_length >= options.list_limit:
raise ValueError(
f"List limit exceeded: Only {options.list_limit} element{'' if options.list_limit == 1 else 's'} allowed in a list."
)

return value

Expand All @@ -61,11 +71,26 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str
clean_str: str = value.replace("?", "", 1) if options.ignore_query_prefix else value
clean_str = clean_str.replace("%5B", "[").replace("%5b", "[").replace("%5D", "]").replace("%5d", "]")
limit: t.Optional[int] = None if isinf(options.parameter_limit) else options.parameter_limit # type: ignore [assignment]

if limit is not None and limit <= 0:
raise ValueError("Parameter limit must be a positive integer.")

parts: t.List[str]
if isinstance(options.delimiter, re.Pattern):
parts = re.split(options.delimiter, clean_str) if not limit else re.split(options.delimiter, clean_str)[:limit]
parts = (
re.split(options.delimiter, clean_str)
if (limit is None) or not limit
else re.split(options.delimiter, clean_str)[: (limit + 1 if options.raise_on_limit_exceeded else limit)]
)
else:
parts = clean_str.split(options.delimiter) if not limit else clean_str.split(options.delimiter)[:limit]
parts = (
clean_str.split(options.delimiter)
if (limit is None) or not limit
else clean_str.split(options.delimiter)[: (limit + 1 if options.raise_on_limit_exceeded else limit)]
)

if options.raise_on_limit_exceeded and (limit is not None) and len(parts) > limit:
raise ValueError(f"Parameter limit exceeded: Only {limit} parameter{'' if limit == 1 else 's'} allowed.")

skip_index: int = -1 # Keep track of where the utf8 sentinel was found
i: int
Expand Down Expand Up @@ -98,7 +123,11 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str
else:
key = options.decoder(part[:pos], charset)
val = Utils.apply(
_parse_array_value(part[pos + 1 :], options),
_parse_array_value(
part[pos + 1 :],
options,
len(obj[key]) if key in obj and isinstance(obj[key], (list, tuple)) else 0,
),
lambda v: options.decoder(v, charset),
)

Expand All @@ -123,7 +152,20 @@ def _parse_query_string_values(value: str, options: DecodeOptions) -> t.Dict[str
def _parse_object(
chain: t.Union[t.List[str], t.Tuple[str, ...]], val: t.Any, options: DecodeOptions, values_parsed: bool
) -> t.Any:
leaf: t.Any = val if values_parsed else _parse_array_value(val, options)
current_list_length: int = 0

if bool(chain) and chain[-1] == "[]":
parent_key: t.Optional[int]

try:
parent_key = int("".join(chain[0:-1]))
except ValueError:
parent_key = None

if parent_key is not None and isinstance(val, (list, tuple)) and parent_key in dict(enumerate(val)):
current_list_length = len(val[parent_key])

leaf: t.Any = val if values_parsed else _parse_array_value(val, options, current_list_length)

i: int
for i in reversed(range(len(chain))):
Expand Down
7 changes: 5 additions & 2 deletions src/qs_codec/models/decode_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ class DecodeOptions:
"""To disable ``list`` parsing entirely, set ``parse_lists`` to ``False``."""

strict_depth: bool = False
"""Set to ``True`` to throw an error when the input exceeds the ``depth`` limit."""
"""Set to ``True`` to raise an error when the input exceeds the ``depth`` limit."""

strict_null_handling: bool = False
"""Set to true to decode values without ``=`` to ``None``."""
"""Set to ``True`` to decode values without ``=`` to ``None``."""

raise_on_limit_exceeded: bool = False
"""Set to ``True`` to raise an error when the input contains more parameters than the ``list_limit``."""

decoder: t.Callable[[t.Optional[str], t.Optional[Charset]], t.Any] = DecodeUtils.decode
"""Set a ``Callable`` to affect the decoding of the input."""
Expand Down
15 changes: 4 additions & 11 deletions src/qs_codec/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ def merge(
else:
target_[len(target_)] = source

target = list(filter(lambda el: not isinstance(el, Undefined), target_.values()))
if any(isinstance(value, Undefined) for value in target_.values()):
target = {str(i): target_[i] for i in target_ if not isinstance(target_[i], Undefined)}
else:
target = list(filter(lambda el: not isinstance(el, Undefined), target_.values()))
else:
if isinstance(source, (list, tuple)):
if all((isinstance(el, t.Mapping) or isinstance(el, Undefined)) for el in target) and all(
Expand Down Expand Up @@ -123,20 +126,10 @@ def compact(value: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
queue.append({"obj": obj, "prop": key})
refs.append(val)

Utils._compact_queue(queue)
Utils._remove_undefined_from_map(value)

return value

@staticmethod
def _compact_queue(queue: t.List[t.Dict]) -> None:
while len(queue) > 1:
item = queue.pop()
obj = item["obj"][item["prop"]]

if isinstance(obj, (list, tuple)):
item["obj"][item["prop"]] = list(filter(lambda el: not isinstance(el, Undefined), obj))

@staticmethod
def _remove_undefined_from_list(value: t.List) -> None:
i: int = len(value) - 1
Expand Down
66 changes: 64 additions & 2 deletions tests/unit/decode_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def test_parses_a_mix_of_simple_and_explicit_lists(self) -> None:
assert decode("a[0]=b&a=c") == {"a": ["b", "c"]}
assert decode("a=b&a[0]=c") == {"a": ["b", "c"]}

assert decode("a[1]=b&a=c", DecodeOptions(list_limit=20)) == {"a": ["b", "c"]}
assert decode("a[1]=b&a=c", DecodeOptions(list_limit=20)) == {"a": {"1": "b", "2": "c"}}
assert decode("a[]=b&a=c", DecodeOptions(list_limit=0)) == {"a": ["b", "c"]}
assert decode("a[]=b&a=c") == {"a": ["b", "c"]}

Expand Down Expand Up @@ -292,7 +292,9 @@ def test_allows_for_empty_strings_in_lists(self) -> None:
assert decode("a[]=&a[]=b&a[]=c") == {"a": ["", "b", "c"]}

def test_compacts_sparse_lists(self) -> None:
assert decode("a[10]=1&a[2]=2", DecodeOptions(list_limit=20)) == {"a": ["2", "1"]}
decoded = decode("a[10]=1&a[2]=2", DecodeOptions(list_limit=20))
assert decoded == {"a": {"10": "1", "2": "2"}}
assert decoded != {"a": ["2", "1"]}
assert decode("a[1][b][2][c]=1", DecodeOptions(list_limit=20)) == {"a": [{"b": [{"c": "1"}]}]}
assert decode("a[1][2][3][c]=1", DecodeOptions(list_limit=20)) == {"a": [[[{"c": "1"}]]]}
assert decode("a[1][2][3][c][1]=1", DecodeOptions(list_limit=20)) == {"a": [[[{"c": ["1"]}]]]}
Expand Down Expand Up @@ -684,3 +686,63 @@ def test_decodes_successfully_when_depth_is_within_the_limit_with_strict_depth_f

def test_does_not_throw_when_depth_is_exactly_at_the_limit_with_strict_depth_true(self) -> None:
assert decode("a[b][c]=d", DecodeOptions(depth=2, strict_depth=True)) == {"a": {"b": {"c": "d"}}}


class TestParameterList:
def test_does_not_raise_error_when_within_parameter_limit(self) -> None:
assert decode("a=1&b=2&c=3", DecodeOptions(parameter_limit=5, raise_on_limit_exceeded=True)) == {
"a": "1",
"b": "2",
"c": "3",
}

def test_raises_error_when_parameter_limit_exceeded(self) -> None:
with pytest.raises(ValueError):
decode("a=1&b=2&c=3&d=4&e=5&f=6", DecodeOptions(parameter_limit=3, raise_on_limit_exceeded=True))

def test_silently_truncates_when_throw_on_limit_exceeded_is_not_given(self) -> None:
assert decode("a=1&b=2&c=3&d=4&e=5", DecodeOptions(parameter_limit=3)) == {"a": "1", "b": "2", "c": "3"}

def test_silently_truncates_when_parameter_limit_exceeded_without_error(self) -> None:
assert decode("a=1&b=2&c=3&d=4&e=5", DecodeOptions(parameter_limit=3, raise_on_limit_exceeded=False)) == {
"a": "1",
"b": "2",
"c": "3",
}

def test_allows_unlimited_parameters_when_parameter_limit_set_to_infinity(self) -> None:
assert decode("a=1&b=2&c=3&d=4&e=5&f=6", DecodeOptions(parameter_limit=float("inf"))) == {
"a": "1",
"b": "2",
"c": "3",
"d": "4",
"e": "5",
"f": "6",
}


class TestListLimit:
def test_does_not_raise_error_when_within_list_limit(self) -> None:
assert decode("a[]=1&a[]=2&a[]=3", DecodeOptions(list_limit=5, raise_on_limit_exceeded=True)) == {
"a": ["1", "2", "3"],
}

def test_raises_error_when_list_limit_exceeded(self) -> None:
with pytest.raises(ValueError):
decode("a[]=1&a[]=2&a[]=3&a[]=4", DecodeOptions(list_limit=3, raise_on_limit_exceeded=True))

def test_converts_list_to_map_if_length_is_greater_than_limit(self) -> None:
assert decode("a[1]=1&a[2]=2&a[3]=3&a[4]=4&a[5]=5&a[6]=6", DecodeOptions(list_limit=5)) == {
"a": {"1": "1", "2": "2", "3": "3", "4": "4", "5": "5", "6": "6"}
}

def test_handles_list_limit_of_zero_correctly(self) -> None:
assert decode("a[]=1&a[]=2", DecodeOptions(list_limit=0)) == {"a": ["1", "2"]}

def test_handles_negative_list_limit_correctly(self) -> None:
with pytest.raises(ValueError):
decode("a[]=1&a[]=2", DecodeOptions(list_limit=-1, raise_on_limit_exceeded=True))

def test_applies_list_limit_to_nested_lists(self) -> None:
with pytest.raises(ValueError):
decode("a[0][]=1&a[0][]=2&a[0][]=3&a[0][]=4", DecodeOptions(list_limit=3, raise_on_limit_exceeded=True))
2 changes: 1 addition & 1 deletion tests/unit/example_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_lists(self):
# Note that the only difference between an index in a `list` and a key in a `dict` is that the value between the
# brackets must be a number to create a `list`. When creating `list`s with specific indices, **qs_codec** will compact
# a sparse `list` to only the existing values preserving their order:
assert qs_codec.decode("a[1]=b&a[15]=c") == {"a": ["b", "c"]}
assert qs_codec.decode("a[1]=b&a[15]=c") == {"a": {"1": "b", "15": "c"}}

# Note that an empty string is also a value, and will be preserved:
assert qs_codec.decode("a[]=&a[]=b") == {"a": ["", "b"]}
Expand Down

0 comments on commit 465bd93

Please sign in to comment.