From 4653c6d5fd2705da68b482e22faba6337dd4c9e4 Mon Sep 17 00:00:00 2001 From: securisec Date: Sat, 29 Jun 2024 15:12:45 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=97=93=20Jun=2029,=202024=203:11:57?= =?UTF-8?q?=E2=80=AFPM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ๐Ÿ™ get_by_key to support py syntax also ๐Ÿค– types added/updated ๐Ÿงช tests added/updated โœจ dump_json to dump state as string --- chepy/core.py | 86 ++++++++++++++++++++++++++++++++++-- chepy/core.pyi | 4 +- tests/test_core.py | 31 +++++++++++++ tests_plugins/test_binary.py | 10 ++++- tests_plugins/test_pcap.py | 2 +- 5 files changed, 126 insertions(+), 7 deletions(-) diff --git a/chepy/core.py b/chepy/core.py index a56df96..784c30c 100644 --- a/chepy/core.py +++ b/chepy/core.py @@ -741,12 +741,48 @@ def get_by_index(self, *indexes: int): self.state = [self.state[int(index)] for index in indexes] return self + def _get_nested_value(self, data, key, split_by="."): + """Get a dict value based on a string key with dot notation. Supports array indexing. + If split_by is None or "", returns only the first key + Args: + data (dict): Data + key (str): Dict key in a dot notation and array + split_by (str, optional): Chars to split key by. Defaults to ".". + """ + if not split_by: + return data[key] + try: + keys = key.split(split_by) + for key in keys: + if "[" in key: + # Extract the key and index + key, index_str = key.split("[") + index_str = index_str.rstrip("]").strip() + if index_str == "*": + data = [data[key][i] for i in range(len(data[key]))] + else: + index = int(index_str) + data = data[key][index] + else: + if isinstance(data, list): + data = [ + data[i][key] for i in range(len(data)) if key in data[i] + ] + else: + data = data[key] if key in data else data + return data + except Exception as e: # pragma: no cover + self._error_logger(e) + return data + @ChepyDecorators.call_stack - def get_by_key(self, query: str): - """This method support json query support. + def get_by_key(self, *keys: str, py_style: bool = False, split_key: str = "."): + """This method support json keys support. Args: keys (Tuple[Union[Hashable, None]]): Keys to extract. + split_key (str, optional): Split nested keys. Defaults to "." + nested (bool, optional): If the specified keys are nested. Supports array indexing. Defaults to True Returns: Chepy: The Chepy object. @@ -763,7 +799,21 @@ def get_by_key(self, query: str): ), ), "State does not contain valid data" - self.state = jmespath.search(query, self.state) + if py_style: + if len(keys) == 1: + self.state = self._get_nested_value( + self.state, keys[0], split_by=split_key + ) + else: + self.state = [ + self._get_nested_value(self.state, key, split_by=split_key) + for key in keys + ] + else: + o = jmespath.search(keys[0], self.state) + if o is None: # pragma: no cover + raise ValueError("Query did not match any data") + self.state = o return self @ChepyDecorators.call_stack @@ -1586,3 +1636,33 @@ def set_register(self, key: str, val: Union[str, bytes]): """ self._registers[key] = val return self + + @ChepyDecorators.call_stack + def dump_json(self): + """Json serialize the state + + Returns: + Chepy: The Chepy object. + """ + + # Function to recursively convert bytes to UTF-8 strings or Base64-encoded strings + def encode_bytes(obj): + if isinstance(obj, bytes): + try: + # Try to decode as UTF-8 + return obj.decode("utf-8") + except UnicodeDecodeError: # pragma: no cover + # If decoding fails, encode as Base64 + return base64.b64encode(obj).decode("utf-8") + elif isinstance(obj, dict): + return { + encode_bytes(k) if isinstance(k, bytes) else k: encode_bytes(v) + for k, v in obj.items() + } + elif isinstance(obj, list): + return [encode_bytes(item) for item in obj] + else: + return obj + + self.state = json.dumps(encode_bytes(self.state)) + return self diff --git a/chepy/core.pyi b/chepy/core.pyi index 3aef5a0..dd5f859 100644 --- a/chepy/core.pyi +++ b/chepy/core.pyi @@ -29,6 +29,7 @@ class ChepyCore: def _convert_to_str(self) -> str: ... def _convert_to_int(self) -> int: ... def _str_to_bytes(self, s: str) -> bytes: ... + def _get_nested_value(self: ChepyCoreT, data: dict, key:str, split_by: str=".") -> Any: ... def _bytes_to_str(self, s: bytes) -> str: ... @property def state(self): ... @@ -53,7 +54,7 @@ class ChepyCore: def out(self: ChepyCoreT) -> ChepyCoreT: ... def out_as_str(self: ChepyCoreT) -> str: ... def get_by_index(self: ChepyCoreT, *indexes: int) -> ChepyCoreT: ... - def get_by_key(self: ChepyCoreT, query: str) -> ChepyCoreT: ... + def get_by_key(self: ChepyCoreT, *keys: Union[str, bytes], py_style: bool=False, split_key: Union[str, None] = '.') -> ChepyCoreT: ... def copy_to_clipboard(self: ChepyCoreT) -> None: ... def copy(self: ChepyCoreT) -> None: ... def web(self: ChepyCoreT, magic: bool=..., cyberchef_url: str=...) -> None: ... @@ -86,3 +87,4 @@ class ChepyCore: def suffix(self: ChepyCoreT, data: Union[str, bytes]) -> ChepyCoreT: ... def get_register(self: ChepyCoreT, key: str) -> Union[str, bytes]: ... def set_register(self: ChepyCoreT, key: str, val: Union[str, bytes]) -> ChepyCoreT: ... + def dump_json(self: ChepyCoreT) -> ChepyCoreT: ... diff --git a/tests/test_core.py b/tests/test_core.py index 9b60d7f..4ea5c81 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -93,6 +93,25 @@ def test_get_by_key(): assert Chepy(data2).get_by_key("menu.popup.menuitem[0].value").o == b"New" assert Chepy(data2).get_by_key("menu").o.get("id") == "file" assert Chepy([{"a": "b"}, {"a": "d"}]).get_by_key("[].a").o == ["b", "d"] + # pyton style keys + assert Chepy(data2).get_by_key( + "menu.popup.menuitem[1].value", "menu.popup.menuitem[2].value", py_style=True + ).o == ["Open", "Close"] + assert ( + Chepy(data2) + .get_by_key("menu..popup..menuitem[0]..value", split_key="..", py_style=True) + .o + == b"New" + ) + assert ( + Chepy(data2).get_by_key("menu", split_key=None, py_style=True).o.get("id") + == "file" + ) + assert Chepy(data2).get_by_key("menu.popup.menuitem[*].value", py_style=True).o == [ + "New", + "Open", + "Close", + ] def test_delete_state(): @@ -321,3 +340,15 @@ def test_register(): def test_ixs(): assert Chepy("b").prefix("a").o == b"ab" assert Chepy("b").suffix("a").o == b"ba" + + +def test_dump_json(): + data = { + "key1": b"some byte data", + "key2": [b"more byte data", b"\x00\x01\x02", "\x03"], + "key3": {"nestedKey": b"nested byte data"}, + b"byte_key": b"\x00\x01", + } + + assert Chepy(data).dump_json().json_to_dict().get_by_key('byte_key').o == b'\x00\x01' + assert Chepy(True).dump_json().o == b'true' \ No newline at end of file diff --git a/tests_plugins/test_binary.py b/tests_plugins/test_binary.py index 2accf4a..d0f8fc4 100644 --- a/tests_plugins/test_binary.py +++ b/tests_plugins/test_binary.py @@ -19,7 +19,9 @@ def test_pe_imports(): Chepy("tests/files/ff.exe") .read_file() .pe_imports() - .get_by_key(b"api-ms-win-crt-filesystem-l1-1-0.dll", split_key=None) + .get_by_key( + b"api-ms-win-crt-filesystem-l1-1-0.dll", py_style=True, split_key=None + ) .o ) == 2 @@ -33,7 +35,11 @@ def test_pe_exports(): def test_elf_imports(): assert ( len( - Chepy("tests/files/elf").load_file().elf_imports().get_by_key(".rela.dyn",split_key=None).o + Chepy("tests/files/elf") + .load_file() + .elf_imports() + .get_by_key(".rela.dyn", py_style=True, split_key=None) + .o ) == 9 ) diff --git a/tests_plugins/test_pcap.py b/tests_plugins/test_pcap.py index faa7794..b06a6de 100644 --- a/tests_plugins/test_pcap.py +++ b/tests_plugins/test_pcap.py @@ -48,7 +48,7 @@ def test_pcap_convo(): Chepy("tests/files/test.pcapng") .read_pcap() .pcap_convos() - .get_by_key("10.10.10.11", split_key="") + .get_by_key("10.10.10.11", split_key="", py_style=True) .o["ICMP"] )