Skip to content

Commit

Permalink
πŸ—“ Oct 25, 2023 6:29:05β€―PM
Browse files Browse the repository at this point in the history
✨ move xpath and css extractors to core
βž• deps added/updated
πŸ§ͺ tests added/updated
✨ get_by_key updated to allow nested values
πŸ€– types added/updated
πŸ”₯ removed join_list
  • Loading branch information
securisec committed Oct 25, 2023
1 parent 0ac6a0a commit 4d511d7
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 81 deletions.
2 changes: 1 addition & 1 deletion chepy/chepy_plugins
Submodule chepy_plugins updated 4 files
+0 βˆ’94 chepy_extract.py
+0 βˆ’6 chepy_extract.pyi
+1 βˆ’0 chepy_ml.py
+0 βˆ’1 requirements.txt
45 changes: 36 additions & 9 deletions chepy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,32 @@ def _convert_to_int(self) -> int:
else: # pragma: no cover
raise NotImplementedError

def _get_nested_value(self, data, key, split_by="."):
"""Get a dict value based on a string key with dot notation. Supports array indexing.
If split_by is None or "", returns only the first key
Args:
data (dict): Data
key (str): Dict key in a dot notation and array
split_by (str, optional): Chars to split key by. Defaults to ".".
"""
if not split_by:
return data[key]
try:
keys = key.split(split_by)
for k in keys:
if "[" in k:
# Extract the key and index
k, index_str = k.split("[")
index = int(index_str.rstrip("]"))
data = data[k][index]
else:
data = data[k]
return data
except Exception as e: # pragma: no cover
self._error_logger(e)
return data

@property
def o(self):
"""Get the final output
Expand Down Expand Up @@ -719,20 +745,21 @@ def get_by_index(self, index: int):
return self

@ChepyDecorators.call_stack
def get_by_key(self, key: str):
"""Get an object from a dict by key
def get_by_key(self, key: str, split_key: str = "."):
"""Get value from a dict. Supports nested keys and arrays.
Args:
key (str): A valid key
key (Union[Hashable, None]): Keys to extract.
split_key (str, optional): Split nested keys. Defaults to "."
nested (bool, optional): If the specified keys are nested. Supports array indexing. Defaults to True
Returns:
Chepy: The Chepy object.
"""
if isinstance(self.state, dict):
self.state = self.state.get(key)
return self
else: # pragma: no cover
raise TypeError("State is not a dictionary")
assert isinstance(self.state, dict), "State is not a dictionary"

self.state = self._get_nested_value(self.state, key, split_by=split_key)
return self

@ChepyDecorators.call_stack
def copy_to_clipboard(self) -> None: # pragma: no cover
Expand Down Expand Up @@ -801,7 +828,7 @@ def http_request(
json: dict = None,
headers: dict = {},
cookies: dict = {},
):
): # pragma: no cover
"""Make a http/s request
Make a HTTP/S request and work with the data in Chepy. Most common http
Expand Down
7 changes: 4 additions & 3 deletions chepy/core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ class ChepyCore:
def _convert_to_bytearray(self) -> bytearray: ...
def _convert_to_str(self) -> str: ...
def _convert_to_int(self) -> int: ...
def _get_nested_value(self: ChepyCoreT, data: dict, key:str, split_by: str=".") -> Any: ...
def _str_to_bytes(self, s: str) -> bytes: ...
def _bytes_to_str(self, s: bytes) -> str: ...
@property
def state(self): ...
@state.setter
def state(self: ChepyCoreT, val: Any) -> None: ...
def fork(self: ChepyCoreT, methods: List[Tuple[Union[str, Callable], dict]]) -> ChepyCoreT: ...
def for_each(self: ChepyCoreT, methods: List[Tuple[Union[str, object], dict]]) -> ChepyCoreT: ...
def fork(self: ChepyCoreT, methods: List[Union[Tuple[Union[str, Callable], dict], Tuple[Union[str, Callable],]]]) -> ChepyCoreT: ...
def for_each(self: ChepyCoreT, methods: List[Union[Tuple[Union[str, Callable], dict], Tuple[Union[str, Callable],]]]) -> ChepyCoreT: ...
def set_state(self: ChepyCoreT, data: Any) -> ChepyCoreT: ...
def create_state(self: ChepyCoreT): ...
def copy_state(self: ChepyCoreT, index: int=...) -> ChepyCoreT: ...
Expand All @@ -50,7 +51,7 @@ class ChepyCore:
def out(self: ChepyCoreT) -> ChepyCoreT: ...
def out_as_str(self: ChepyCoreT) -> str: ...
def get_by_index(self: ChepyCoreT, index: int) -> ChepyCoreT: ...
def get_by_key(self: ChepyCoreT, key: str) -> ChepyCoreT: ...
def get_by_key(self: ChepyCoreT, key: str, split_key: Union[str, None] = '.') -> ChepyCoreT: ...
def copy_to_clipboard(self: ChepyCoreT) -> None: ...
def copy(self: ChepyCoreT) -> None: ...
def web(self: ChepyCoreT, magic: bool=..., cyberchef_url: str=...) -> None: ...
Expand Down
25 changes: 6 additions & 19 deletions chepy/modules/dataformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,35 +95,22 @@ def str_list_to_list(self) -> DataFormatT:
return self

@ChepyDecorators.call_stack
def join(self, by: Union[str, bytes] = "") -> DataFormatT:
def join(self, join_by: Union[str, bytes] = "") -> DataFormatT:
"""Join a list with specified character
Args:
by (Union[str, bytes], optional): What to join with. Defaults to ""
join_by (Union[str, bytes], optional): What to join with. Defaults to ""
Returns:
Chepy: The Chepy object.
Examples:
>>> Chepy(["a", "b", "c"]).join_list(":").o
"a:b:c"
"""
self.state = by.join(self.state)
return self

@ChepyDecorators.call_stack
def join_list(self, by: Union[str, bytes] = "") -> DataFormatT:
"""Join a list with specified character
Args:
by (Union[str, bytes], optional): What to join with. Defaults to ""
Returns:
Chepy: The Chepy object.
Examples:
>>> Chepy(["a", "b", "c"]).join_list(":").o
"a:b:c"
"""
self.state = by.join(self.state)
assert isinstance(self.state, list), "State is not a list"
data = [self._to_bytes(x) for x in self.state]
join_by = self._str_to_bytes(join_by)
self.state = join_by.join(data)
return self

@ChepyDecorators.call_stack
Expand Down
5 changes: 2 additions & 3 deletions chepy/modules/dataformat.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ..core import ChepyCore
from typing import Any, Literal, TypeVar, Union
from typing import Any, Literal, TypeVar, Union, Hashable

yaml: Any
DataFormatT = TypeVar('DataFormatT', bound='DataFormat')
Expand All @@ -11,8 +11,7 @@ class DataFormat(ChepyCore):
def bytes_to_ascii(self: DataFormatT) -> DataFormatT: ...
def list_to_str(self: DataFormatT, join_by: Union[str, bytes]=...) -> DataFormatT: ...
def str_list_to_list(self: DataFormatT) -> DataFormatT: ...
def join(self: DataFormatT, by: Union[str, bytes]=...) -> DataFormatT: ...
def join_list(self: DataFormatT, by: Union[str, bytes]=...) -> DataFormatT: ...
def join(self: DataFormatT, join_by: Union[str, bytes]=...) -> DataFormatT: ...
def json_to_dict(self: DataFormatT) -> DataFormatT: ...
def dict_to_json(self: DataFormatT) -> DataFormatT: ...
def dict_get_items(self: DataFormatT, *keys: str) -> DataFormatT: ...
Expand Down
101 changes: 100 additions & 1 deletion chepy/modules/extractors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import math
from binascii import unhexlify
from typing import TypeVar, Union
from typing import TypeVar, Union, List
from urllib.parse import urlparse as _pyurlparse
import lazy_import

import regex as re
import re as old_re

parsel = lazy_import.lazy_module("parsel")

from ..core import ChepyCore, ChepyDecorators

ExtractorsT = TypeVar("ExtractorsT", bound="Extractors")
Expand All @@ -19,6 +22,10 @@ class Extractors(ChepyCore):
def __init__(self, *data):
super().__init__(*data)

def _parsel_obj(self):
"""Returns a parsel.Selector object"""
return parsel.Selector(self._convert_to_str())

@ChepyDecorators.call_stack
def extract_hashes(self) -> ExtractorsT:
"""Extract md5, sha1, sha256 and sha512 hashes
Expand Down Expand Up @@ -583,3 +590,95 @@ def decodeText(text):
set_use_chars(_zw_chars)
self.state = decodeText(self._convert_to_str())
return self

@ChepyDecorators.call_stack
def xpath_selector(self, query: str, namespaces: str = None):
"""Extract data using valid xpath selectors
Args:
query (str): Required. Xpath query
namespaces (str, optional): Namespace. Applies for XML data. Defaults to None.
Returns:
Chepy: The Chepy object.
Examples:
>>> c = Chepy("http://example.com")
>>> c.http_request()
>>> c.xpath_selector("//title/text()")
>>> c.get_by_index(0)
>>> c.o
"Example Domain"
"""
self.state = (
parsel.Selector(self._convert_to_str(), namespaces=namespaces)
.xpath(query)
.getall()
)
return self

@ChepyDecorators.call_stack
def css_selector(self, query: str):
"""Extract data using valid CSS selectors
Args:
query (str): Required. CSS query
Returns:
Chepy: The Chepy object.
Examples:
>>> c = Chepy("http://example.com")
>>> c.http_request()
>>> c.css_selector("title")
>>> c.get_by_index(0)
>>> c.o
"<title>Example Domain</title>"
"""
self.state = self._parsel_obj().css(query).getall()
return self

@ChepyDecorators.call_stack
def extract_html_tags(self, tags: List[str]):
"""Extract tags from html along with their attributes
Args:
tag (str): A HTML tag
Returns:
Chepy: The Chepy object.
Examples:
>>> Chepy("http://example.com").http_request().html_tags(['p']).o
[
{'tag': 'p', 'attributes': {}},
{'tag': 'p', 'attributes': {}},
{'tag': 'p', 'attributes': {}}
]
"""
hold = []

for tag in tags:
for element in self._parsel_obj().xpath("//{}".format(tag)):
attributes = []
for index, attribute in enumerate(element.xpath("@*"), start=1):
attribute_name = element.xpath(
"name(@*[%d])" % index
).extract_first()
attributes.append((attribute_name, attribute.extract()))
hold.append({"tag": tag, "attributes": dict(attributes)})

self.state = hold
return self

@ChepyDecorators.call_stack
def extract_html_comments(self):
"""Extract html comments
Returns:
Chepy: The Chepy object.
"""
self.state = list(
filter(lambda x: x != "", self._parsel_obj().xpath("//comment()").getall())
)
return self
8 changes: 7 additions & 1 deletion chepy/modules/extractors.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from ..core import ChepyCore
from typing import Any, TypeVar, Union
from typing import Any, TypeVar, Union, List

parsel: Any

ExtractorsT = TypeVar('ExtractorsT', bound='Extractors')

Expand Down Expand Up @@ -38,3 +40,7 @@ class Extractors(ChepyCore):
def find_longest_continious_pattern(self: ExtractorsT, str2: Union[str, bytes]) -> ExtractorsT: ...
def extract_zero_width_chars_tags(self: ExtractorsT) -> ExtractorsT: ...
def decode_zero_width(self: ExtractorsT, chars: str=...) -> ExtractorsT: ...
def xpath_selector(self: ExtractorsT, query: str, namespaces: str = ...) -> ExtractorsT: ...
def css_selector(self: ExtractorsT, query: str) -> ExtractorsT: ...
def extract_html_comments(self: ExtractorsT) -> ExtractorsT: ...
def extract_html_tags(self: ExtractorsT, tag: List[str]) -> ExtractorsT: ...
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ typing_extensions
pretty-errors==1.2.25
lz4==4.3.2
passlib==1.7.4
msgpack==1.0.4
msgpack==1.0.4
parsel==1.8.1
26 changes: 24 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,30 @@ def test_load_buffer():
assert c.load_buffer(0).state == b"41"


def test_http_request():
assert Chepy("https://example.com").http_request().get_by_key("status").o == 200
# def test_http_request():
# assert Chepy("https://example.com").http_request().get_by_key("status").o == 200


def test_get_by_key():
data2 = {
"menu": {
"id": "file",
"value": "File",
"popup": {
"menuitem": [
{"value": "New", "onclick": "CreateNewDoc()"},
{"value": "Open", "onclick": "OpenDoc()"},
{"value": "Close", "onclick": "CloseDoc()"},
]
},
}
}
assert Chepy(data2).get_by_key("menu.popup.menuitem[1].value").o == b"Open"
assert (
Chepy(data2).get_by_key("menu..popup..menuitem[0]..value", split_key="..").o
== b"New"
)
assert Chepy(data2).get_by_key("menu", split_key=None).o.get("id") == "file"


def test_delete_state():
Expand Down
6 changes: 1 addition & 5 deletions tests/test_dataformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,8 @@ def test_list_to_str():
assert Chepy([b"a", b"b"]).list_to_str(b".").o == b"a.b"


def test_join_list():
assert Chepy(["a", "b", "c"]).join_list(":").o == b"a:b:c"


def test_join():
assert Chepy(["a", "b", "c"]).join(":").o == b"a:b:c"
assert Chepy(["a", "b", "c", True]).join(":").o == b"a:b:c:True"


def test_to_int():
Expand Down
20 changes: 20 additions & 0 deletions tests/test_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,23 @@ def test_decode_zero_width():
.o["hidden"]
== "secret"
)


def test_xpath_selector():
c = Chepy("tests/files/test.html").load_file().xpath_selector("//p/text()")
assert "This domain" in c.o[0]


def test_css_selector():
c = Chepy("tests/files/test.html").load_file().css_selector("title")
assert "Example Domain" in c.o[0]


def test_html_tags():
c = Chepy("tests/files/test.html").load_file().extract_html_tags(["title", "p"])
assert len(c.o) == 3


def test_html_comments():
c = Chepy("tests/files/test.html").load_file().extract_html_comments()
assert len(c.o) == 3
Loading

0 comments on commit 4d511d7

Please sign in to comment.