Skip to content

Commit

Permalink
πŸ—“ Jun 28, 2024 6:05:05β€―PM
Browse files Browse the repository at this point in the history
βž• deps added/updated
πŸ”₯ full refactor of get_by_key to use json query
✨ prefix/suffix method to add data to state
πŸ™ bring back dict_get_items
  • Loading branch information
securisec committed Jun 28, 2024
1 parent a4ab8bc commit 0f5e9dc
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 62 deletions.
93 changes: 44 additions & 49 deletions chepy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lazy_import
import pyperclip
import json
import jmespath

jsonpickle = lazy_import.lazy_module("jsonpickle")
import regex as re
Expand Down Expand Up @@ -702,41 +703,6 @@ def _convert_to_int(self) -> int:
else: # pragma: no cover
raise NotImplementedError

def _get_nested_value(self, data, key, split_by="."):
"""Get a dict value based on a string key with dot notation. Supports array indexing.
If split_by is None or "", returns only the first key
Args:
data (dict): Data
key (str): Dict key in a dot notation and array
split_by (str, optional): Chars to split key by. Defaults to ".".
"""
if not split_by:
return data[key]
try:
keys = key.split(split_by)
for key in keys:
if "[" in key:
# Extract the key and index
key, index_str = key.split("[")
index_str = index_str.rstrip("]").strip()
if index_str == "*":
data = [data[key][i] for i in range(len(data[key]))]
else:
index = int(index_str)
data = data[key][index]
else:
if isinstance(data, list):
data = [
data[i][key] for i in range(len(data)) if key in data[i]
]
else:
data = data[key] if key in data else data
return data
except Exception as e: # pragma: no cover
self._error_logger(e)
return data

@property
def o(self):
"""Get the final output
Expand Down Expand Up @@ -776,27 +742,28 @@ def get_by_index(self, *indexes: int):
return self

@ChepyDecorators.call_stack
def get_by_key(self, *keys: str, split_key: str = "."):
"""Get value from a dict. Supports nested keys and arrays.
If only one key is specified, the obj is return else a new list is returned
def get_by_key(self, query: str):
"""This method support json query support.
Args:
keys (Tuple[Union[Hashable, None]]): Keys to extract.
split_key (str, optional): Split nested keys. Defaults to "."
nested (bool, optional): If the specified keys are nested. Supports array indexing. Defaults to True
Returns:
Chepy: The Chepy object.
"""
assert isinstance(self.state, dict), "State is not a dictionary"
if len(keys) == 1:
self.state = self._get_nested_value(self.state, keys[0], split_by=split_key)
else:
self.state = [
self._get_nested_value(self.state, key, split_by=split_key)
for key in keys
]
Examples:
>>> Chepy({"a":{"b": "c"}}).get_by_key('a.b')
>>> 'c'
"""
assert isinstance(
self.state,
(
dict,
list,
),
), "State does not contain valid data"

self.state = jmespath.search(query, self.state)
return self

@ChepyDecorators.call_stack
Expand Down Expand Up @@ -1562,6 +1529,34 @@ def register(
self.state = old_state
return self

@ChepyDecorators.call_stack
def prefix(self, data: bytes):
"""Add a prefix to the data in state. The state is converted to bytes
Args:
data (bytes): Data to add
Returns:
Chepy: The Chepy object.
"""
data = self._str_to_bytes(data)
self.state = data + self._convert_to_bytes()
return self

@ChepyDecorators.call_stack
def suffix(self, data: bytes):
"""Add a suffix to the data in state. The state is converted to bytes
Args:
data (bytes): Data to add
Returns:
Chepy: The Chepy object.
"""
data = self._str_to_bytes(data)
self.state = self._convert_to_bytes() + data
return self

def get_register(self, key: str) -> Union[str, bytes]:
"""Get a value from registers by key
Expand Down
7 changes: 4 additions & 3 deletions chepy/core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class ChepyCore:
def _convert_to_bytearray(self) -> bytearray: ...
def _convert_to_str(self) -> str: ...
def _convert_to_int(self) -> int: ...
def _get_nested_value(self: ChepyCoreT, data: dict, key:str, split_by: str=".") -> Any: ...
def _str_to_bytes(self, s: str) -> bytes: ...
def _bytes_to_str(self, s: bytes) -> str: ...
@property
Expand All @@ -54,7 +53,7 @@ class ChepyCore:
def out(self: ChepyCoreT) -> ChepyCoreT: ...
def out_as_str(self: ChepyCoreT) -> str: ...
def get_by_index(self: ChepyCoreT, *indexes: int) -> ChepyCoreT: ...
def get_by_key(self: ChepyCoreT, key: str, split_key: Union[str, None] = '.') -> ChepyCoreT: ...
def get_by_key(self: ChepyCoreT, query: str) -> ChepyCoreT: ...
def copy_to_clipboard(self: ChepyCoreT) -> None: ...
def copy(self: ChepyCoreT) -> None: ...
def web(self: ChepyCoreT, magic: bool=..., cyberchef_url: str=...) -> None: ...
Expand Down Expand Up @@ -83,5 +82,7 @@ class ChepyCore:
def subsection(self: ChepyCoreT, pattern: str, methods: List[Tuple[Union[str, object], dict]], group: int=...) -> ChepyCoreT: ...
def callback(self: ChepyCoreT, callback_function: Callable[[Any], Any]) -> ChepyCoreT: ...
def register(self: ChepyCoreT, pattern: Union[str, bytes], ignore_case: bool=False, multiline: bool=False, dotall: bool=False, unicode: bool=False, extended: bool=False) -> ChepyCoreT: ...
def prefix(self: ChepyCoreT, data: Union[str, bytes]) -> ChepyCoreT: ...
def suffix(self: ChepyCoreT, data: Union[str, bytes]) -> ChepyCoreT: ...
def get_register(self: ChepyCoreT, key: str) -> Union[str, bytes]: ...
def set_register(self, key: str, val: Union[str, bytes]) -> ChepyCoreT: ...
def set_register(self: ChepyCoreT, key: str, val: Union[str, bytes]) -> ChepyCoreT: ...
20 changes: 20 additions & 0 deletions chepy/modules/dataformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,26 @@ def dict_to_json(self) -> DataFormatT:
self.state = json.dumps(self.state)
return self

@ChepyDecorators.call_stack
def dict_get_items(self, *keys: str) -> DataFormatT:
"""Get items from a dict. If no keys are specified, it will return all items.
Returns:
Chepy: The Chepy object.
Examples:
>>> o = Chepy({"a": 1, "b": 2}).dict_get_items("a", "b", "c").o
[1, 2]
"""
assert isinstance(self.state, dict), "Not a dict object"
if len(keys) == 0:
self.state = list(self.state.values())
return self
o = list()
for k in keys:
if self.state.get(k):
o.append(self.state.get(k))
self.state = o
return self

@ChepyDecorators.call_stack
def yaml_to_json(self) -> DataFormatT: # pragma: no cover
"""Convert yaml to a json string
Expand Down
3 changes: 2 additions & 1 deletion chepy/modules/dataformat.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ..core import ChepyCore
from typing import Any, Literal, TypeVar, Union, Hashable
from typing import Any, Literal, TypeVar, Union

yaml: Any
DataFormatT = TypeVar('DataFormatT', bound='DataFormat')
Expand All @@ -14,6 +14,7 @@ class DataFormat(ChepyCore):
def join(self: DataFormatT, join_by: Union[str, bytes]=...) -> DataFormatT: ...
def json_to_dict(self: DataFormatT) -> DataFormatT: ...
def dict_to_json(self: DataFormatT) -> DataFormatT: ...
def dict_get_items(self: DataFormatT, *keys: str) -> DataFormatT: ...
def yaml_to_json(self: DataFormatT) -> DataFormatT: ...
def json_to_yaml(self: DataFormatT) -> DataFormatT: ...
def to_base58(self: DataFormatT) -> DataFormatT: ...
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fire==0.6.0
lazy-import
hexdump
jsonpickle
jmespath==1.0.1
prompt_toolkit>=2.0.8
pycipher
pycryptodome
Expand Down
18 changes: 9 additions & 9 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,14 @@ def test_get_by_key():
}
}
assert Chepy(data2).get_by_key("menu.popup.menuitem[1].value").o == b"Open"
assert Chepy(data2).get_by_key(
"menu.popup.menuitem[1].value", "menu.popup.menuitem[2].value"
).o == ["Open", "Close"]
assert (
Chepy(data2).get_by_key("menu..popup..menuitem[0]..value", split_key="..").o
== b"New"
)
assert Chepy(data2).get_by_key("menu", split_key=None).o.get("id") == "file"
assert Chepy(data2).get_by_key("menu.popup.menuitem[*].value").o == [
assert Chepy(data2).get_by_key("menu.popup.menuitem[].value").o == [
"New",
"Open",
"Close",
]
assert Chepy(data2).get_by_key("menu.popup.menuitem[0].value").o == b"New"
assert Chepy(data2).get_by_key("menu").o.get("id") == "file"
assert Chepy([{"a": "b"}, {"a": "d"}]).get_by_key("[].a").o == ["b", "d"]


def test_delete_state():
Expand Down Expand Up @@ -321,3 +316,8 @@ def test_register():
c.regex_search("out = (.+)").get_by_index(0).from_hex()
c.aes_decrypt(c.get_register("$R0"), key_format="base64", mode="ECB")
assert c.o == b"hello"


def test_ixs():
assert Chepy("b").prefix("a").o == b"ab"
assert Chepy("b").suffix("a").o == b"ba"

0 comments on commit 0f5e9dc

Please sign in to comment.