Skip to content

Commit

Permalink
πŸ—“ Jun 29, 2024 3:11:57β€―PM
Browse files Browse the repository at this point in the history
πŸ™ get_by_key to support py syntax also
πŸ€– types added/updated
πŸ§ͺ tests added/updated
✨ dump_json to dump state as string
  • Loading branch information
securisec committed Jun 29, 2024
1 parent 0f5e9dc commit 4653c6d
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 7 deletions.
86 changes: 83 additions & 3 deletions chepy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,12 +741,48 @@ def get_by_index(self, *indexes: int):
self.state = [self.state[int(index)] for index in indexes]
return self

def _get_nested_value(self, data, key, split_by="."):
"""Get a dict value based on a string key with dot notation. Supports array indexing.
If split_by is None or "", returns only the first key
Args:
data (dict): Data
key (str): Dict key in a dot notation and array
split_by (str, optional): Chars to split key by. Defaults to ".".
"""
if not split_by:
return data[key]
try:
keys = key.split(split_by)
for key in keys:
if "[" in key:
# Extract the key and index
key, index_str = key.split("[")
index_str = index_str.rstrip("]").strip()
if index_str == "*":
data = [data[key][i] for i in range(len(data[key]))]
else:
index = int(index_str)
data = data[key][index]
else:
if isinstance(data, list):
data = [
data[i][key] for i in range(len(data)) if key in data[i]
]
else:
data = data[key] if key in data else data
return data
except Exception as e: # pragma: no cover
self._error_logger(e)
return data

@ChepyDecorators.call_stack
def get_by_key(self, query: str):
"""This method support json query support.
def get_by_key(self, *keys: str, py_style: bool = False, split_key: str = "."):
"""This method support json keys support.
Args:
keys (Tuple[Union[Hashable, None]]): Keys to extract.
split_key (str, optional): Split nested keys. Defaults to "."
nested (bool, optional): If the specified keys are nested. Supports array indexing. Defaults to True
Returns:
Chepy: The Chepy object.
Expand All @@ -763,7 +799,21 @@ def get_by_key(self, query: str):
),
), "State does not contain valid data"

self.state = jmespath.search(query, self.state)
if py_style:
if len(keys) == 1:
self.state = self._get_nested_value(
self.state, keys[0], split_by=split_key
)
else:
self.state = [
self._get_nested_value(self.state, key, split_by=split_key)
for key in keys
]
else:
o = jmespath.search(keys[0], self.state)
if o is None: # pragma: no cover
raise ValueError("Query did not match any data")
self.state = o
return self

@ChepyDecorators.call_stack
Expand Down Expand Up @@ -1586,3 +1636,33 @@ def set_register(self, key: str, val: Union[str, bytes]):
"""
self._registers[key] = val
return self

@ChepyDecorators.call_stack
def dump_json(self):
"""Json serialize the state
Returns:
Chepy: The Chepy object.
"""

# Function to recursively convert bytes to UTF-8 strings or Base64-encoded strings
def encode_bytes(obj):
if isinstance(obj, bytes):
try:
# Try to decode as UTF-8
return obj.decode("utf-8")
except UnicodeDecodeError: # pragma: no cover
# If decoding fails, encode as Base64
return base64.b64encode(obj).decode("utf-8")
elif isinstance(obj, dict):
return {
encode_bytes(k) if isinstance(k, bytes) else k: encode_bytes(v)
for k, v in obj.items()
}
elif isinstance(obj, list):
return [encode_bytes(item) for item in obj]
else:
return obj

self.state = json.dumps(encode_bytes(self.state))
return self
4 changes: 3 additions & 1 deletion chepy/core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ChepyCore:
def _convert_to_str(self) -> str: ...
def _convert_to_int(self) -> int: ...
def _str_to_bytes(self, s: str) -> bytes: ...
def _get_nested_value(self: ChepyCoreT, data: dict, key:str, split_by: str=".") -> Any: ...
def _bytes_to_str(self, s: bytes) -> str: ...
@property
def state(self): ...
Expand All @@ -53,7 +54,7 @@ class ChepyCore:
def out(self: ChepyCoreT) -> ChepyCoreT: ...
def out_as_str(self: ChepyCoreT) -> str: ...
def get_by_index(self: ChepyCoreT, *indexes: int) -> ChepyCoreT: ...
def get_by_key(self: ChepyCoreT, query: str) -> ChepyCoreT: ...
def get_by_key(self: ChepyCoreT, *keys: Union[str, bytes], py_style: bool=False, split_key: Union[str, None] = '.') -> ChepyCoreT: ...
def copy_to_clipboard(self: ChepyCoreT) -> None: ...
def copy(self: ChepyCoreT) -> None: ...
def web(self: ChepyCoreT, magic: bool=..., cyberchef_url: str=...) -> None: ...
Expand Down Expand Up @@ -86,3 +87,4 @@ class ChepyCore:
def suffix(self: ChepyCoreT, data: Union[str, bytes]) -> ChepyCoreT: ...
def get_register(self: ChepyCoreT, key: str) -> Union[str, bytes]: ...
def set_register(self: ChepyCoreT, key: str, val: Union[str, bytes]) -> ChepyCoreT: ...
def dump_json(self: ChepyCoreT) -> ChepyCoreT: ...
31 changes: 31 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ def test_get_by_key():
assert Chepy(data2).get_by_key("menu.popup.menuitem[0].value").o == b"New"
assert Chepy(data2).get_by_key("menu").o.get("id") == "file"
assert Chepy([{"a": "b"}, {"a": "d"}]).get_by_key("[].a").o == ["b", "d"]
# pyton style keys
assert Chepy(data2).get_by_key(
"menu.popup.menuitem[1].value", "menu.popup.menuitem[2].value", py_style=True
).o == ["Open", "Close"]
assert (
Chepy(data2)
.get_by_key("menu..popup..menuitem[0]..value", split_key="..", py_style=True)
.o
== b"New"
)
assert (
Chepy(data2).get_by_key("menu", split_key=None, py_style=True).o.get("id")
== "file"
)
assert Chepy(data2).get_by_key("menu.popup.menuitem[*].value", py_style=True).o == [
"New",
"Open",
"Close",
]


def test_delete_state():
Expand Down Expand Up @@ -321,3 +340,15 @@ def test_register():
def test_ixs():
assert Chepy("b").prefix("a").o == b"ab"
assert Chepy("b").suffix("a").o == b"ba"


def test_dump_json():
data = {
"key1": b"some byte data",
"key2": [b"more byte data", b"\x00\x01\x02", "\x03"],
"key3": {"nestedKey": b"nested byte data"},
b"byte_key": b"\x00\x01",
}

assert Chepy(data).dump_json().json_to_dict().get_by_key('byte_key').o == b'\x00\x01'
assert Chepy(True).dump_json().o == b'true'
10 changes: 8 additions & 2 deletions tests_plugins/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ def test_pe_imports():
Chepy("tests/files/ff.exe")
.read_file()
.pe_imports()
.get_by_key(b"api-ms-win-crt-filesystem-l1-1-0.dll", split_key=None)
.get_by_key(
b"api-ms-win-crt-filesystem-l1-1-0.dll", py_style=True, split_key=None
)
.o
)
== 2
Expand All @@ -33,7 +35,11 @@ def test_pe_exports():
def test_elf_imports():
assert (
len(
Chepy("tests/files/elf").load_file().elf_imports().get_by_key(".rela.dyn",split_key=None).o
Chepy("tests/files/elf")
.load_file()
.elf_imports()
.get_by_key(".rela.dyn", py_style=True, split_key=None)
.o
)
== 9
)
2 changes: 1 addition & 1 deletion tests_plugins/test_pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_pcap_convo():
Chepy("tests/files/test.pcapng")
.read_pcap()
.pcap_convos()
.get_by_key("10.10.10.11", split_key="")
.get_by_key("10.10.10.11", split_key="", py_style=True)
.o["ICMP"]
)

Expand Down

0 comments on commit 4653c6d

Please sign in to comment.