Skip to content

Commit

Permalink
[cdd/docstring/utils/parse_utils.py] Improve `parse_adhoc_doc_for_typ…
Browse files Browse the repository at this point in the history
…` ; [cdd/shared/ast_utils.py] Better type guards for `get_names` ; [cdd/sqlalchemy/utils/emit_utils.py] Add `keep_existing=True` to SQLalchemy `Table`s
  • Loading branch information
SamuelMarks committed Feb 10, 2024
1 parent 8452e79 commit 95da53e
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 90 deletions.
2 changes: 1 addition & 1 deletion cdd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from logging import getLogger as get_logger

__author__ = "Samuel Marks" # type: str
__version__ = "0.0.99rc26" # type: str
__version__ = "0.0.99rc27" # type: str
__description__ = (
"Open API to/fro routes, models, and tests. "
"Convert between docstrings, classes, methods, argparse, pydantic, and SQLalchemy."
Expand Down
90 changes: 66 additions & 24 deletions cdd/docstring/utils/parse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
import string
from collections import Counter
from functools import partial
from itertools import filterfalse, takewhile
from itertools import chain, filterfalse, takewhile
from keyword import iskeyword
from operator import contains, itemgetter
from typing import List, Optional, Tuple, Union, cast

from cdd.shared.pure_utils import count_iter_items, sliding_window, type_to_name
from cdd.shared.pure_utils import (
count_iter_items,
simple_types,
sliding_window,
type_to_name,
)

adhoc_type_to_type = {
"bool": "bool",
Expand Down Expand Up @@ -65,12 +70,6 @@ def _union_literal_from_sentence(sentence):
union: Union[List[List[str]], List[str], Tuple[str]] = [[]]
_union_literal_from_sentence_phase0(sentence, union)

if not union[-1]:
del union[-1]
else:
union[-1] = "".join(
union[-1][:-1] if union[-1][-1] in frozenset((".", ",")) else union[-1]
)
if len(union) > 1:
candidate_type = next(
map(
Expand All @@ -82,9 +81,22 @@ def _union_literal_from_sentence(sentence):
),
None,
)

if candidate_type is not None:
return candidate_type

else:
candidate_collection = next(
map(
adhoc_3_tuple_to_collection.__getitem__,
filter(
partial(contains, adhoc_3_tuple_to_collection),
sliding_window(union, 3),
),
),
None,
)
if candidate_collection is not None:
return None
union = sorted(
frozenset(
map(
Expand Down Expand Up @@ -164,6 +176,8 @@ def _union_literal_from_sentence_phase0(sentence, union):
:param sentence: Input sentence with 'or' or 'of'
:type sentence: ```str```
:type union: ```Union[List[List[str]], List[str], Tuple[str]]```
"""
i: int = 0
quotes = {"'": 0, '"': 0}
Expand All @@ -183,16 +197,21 @@ def _union_literal_from_sentence_phase0(sentence, union):
)
else union[-1]
)
if union[-1] in frozenset(
("or", "or,", "or;", "or:", "of", "of,", "of;", "of:")
):
if union[-1] in frozenset(("or", "or,", "or;", "or:")):
union[-1] = []
elif union[-1] in frozenset(("of", "of,", "of;", "of:")):
collection_type = adhoc_3_tuple_to_collection.get(tuple(union))
if collection_type is None:
union[-1] = []
else:
union = []
else:
union.append([])
# eat until next non-space
j = i
i += count_iter_items(takewhile(str.isspace, sentence[i:])) - 1
union[-1] = sentence[j : i + 1]

union[slice(*((-1, None) if union else (None, None)))] = sentence[j : i + 1]

union.append([])
if ch in frozenset(("'", '"')):
Expand All @@ -205,6 +224,12 @@ def _union_literal_from_sentence_phase0(sentence, union):
):
i += 1
i += 1
if not union[-1]:
del union[-1]
else:
union[-1] = "".join(
union[-1][:-1] if union[-1][-1] in frozenset((".", ",")) else union[-1]
)


def parse_adhoc_doc_for_typ(doc, name, default_is_none):
Expand All @@ -225,7 +250,6 @@ def parse_adhoc_doc_for_typ(doc, name, default_is_none):
:return: The type (if determined) else `None`
:rtype: ```Optional[str]```
"""

if not doc:
return None

Expand All @@ -239,27 +263,45 @@ def parse_adhoc_doc_for_typ(doc, name, default_is_none):
defaults_idx: int = sentence.rfind(", default")
if defaults_idx != -1:
sentence: str = sentence[:defaults_idx]
if sentence.count("`") == 2:
fst_tick: str = sentence.find("`")
if (sentence.count("`") & 1) == 0:
fst_tick: int = (lambda idx: idx if idx > -1 else None)(sentence.find("`"))
candidate_collection: Optional[str] = next(
map(
adhoc_3_tuple_to_collection.__getitem__,
filter(
partial(contains, adhoc_3_tuple_to_collection),
sliding_window(sentence[:fst_tick], 3),
),
chain.from_iterable(
(
map(
adhoc_3_tuple_to_collection.__getitem__,
filter(
partial(contains, adhoc_3_tuple_to_collection),
sliding_window(sentence[:fst_tick].split(), 3),
),
),
map(
adhoc_3_tuple_to_collection.__getitem__,
filter(
partial(contains, adhoc_3_tuple_to_collection),
sliding_window(words, 3),
),
),
)
),
None,
)
if candidate_collection is not None:
wrap_type_with: str = candidate_collection + "[{}]"
sentence: str = sentence[fst_tick : sentence.rfind("`")]
if fst_tick is not None:
sentence: str = sentence[fst_tick : sentence.rfind("`")]

new_candidate_type: Optional[str] = cast(
Optional[str], _union_literal_from_sentence(sentence)
)
if new_candidate_type is not None:
candidate_type: Optional[str] = new_candidate_type
if (
new_candidate_type.startswith("Literal[")
and candidate_type in simple_types
and candidate_type is not None
):
wrap_type_with = "Union[{}, " + "{}]".format(candidate_type)
candidate_type: str = new_candidate_type
if candidate_type is not None:
return wrap_type_with.format(candidate_type)

Expand Down
4 changes: 2 additions & 2 deletions cdd/shared/ast_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1921,9 +1921,9 @@ def get_names(node):
:return: All top-level symbols (except those within try/except and if/elif/else blocks)
:rtype: ```Generator[str]```
"""
if isinstance(node, Assign) and isinstance(node.targets[0], Name):
if isinstance(node, Assign) and all(map(rpartial(isinstance, Name), node.targets)):
return map(attrgetter("id"), node.targets)
elif isinstance(node, AnnAssign):
elif isinstance(node, AnnAssign) and isinstance(node.target, Name):
return iter((node.target.id,))
elif isinstance(node, (AsyncFunctionDef, FunctionDef, ClassDef)):
return iter((node.name,))
Expand Down
2 changes: 2 additions & 0 deletions cdd/shared/emit/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def file(node, filename, mode="a", skip_black=False):
:return: None
:rtype: ```NoneType```
"""
with open("/tmp/write.log", "a") as f:
f.write("{}\n".format(filename))
if not isinstance(node, Module):
node: Module = Module(body=[node], type_ignores=[], stmt=None)
src: str = cdd.shared.source_transformer.to_code(node)
Expand Down
111 changes: 68 additions & 43 deletions cdd/sqlalchemy/emit.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,56 +108,81 @@ def sqlalchemy_table(
)
)
),
keywords=(
(
lambda val: (
[
keywords=list(
chain.from_iterable(
(
(
(
(
lambda val: (
(
keyword(
arg="comment",
value=set_value(val),
identifier=None,
expr=None,
lineno=None,
**maybe_type_comment,
),
)
if val
else iter(())
)
)(
deindent(
add(
*map(
partial(
docstring,
emit_default_doc=emit_default_doc,
docstring_format=docstring_format,
word_wrap=word_wrap,
emit_original_whitespace=emit_original_whitespace,
emit_types=True,
),
(
{
"doc": (
intermediate_repr[
"doc"
].lstrip()
+ "\n\n"
if intermediate_repr[
"returns"
]
else ""
),
"params": OrderedDict(),
"returns": None,
},
{
"doc": "",
"params": OrderedDict(),
"returns": intermediate_repr[
"returns"
],
},
),
)
).strip()
)
)
if intermediate_repr.get("doc")
else iter(())
)
),
(
keyword(
arg="comment",
value=set_value(val),
arg="keep_existing",
value=set_value(True),
identifier=None,
expr=None,
lineno=None,
**maybe_type_comment,
)
]
if val
else []
)
)(
deindent(
add(
*map(
partial(
docstring,
emit_default_doc=emit_default_doc,
docstring_format=docstring_format,
word_wrap=word_wrap,
emit_original_whitespace=emit_original_whitespace,
emit_types=True,
),
(
{
"doc": (
intermediate_repr["doc"].lstrip() + "\n\n"
if intermediate_repr["returns"]
else ""
),
"params": OrderedDict(),
"returns": None,
},
{
"doc": "",
"params": OrderedDict(),
"returns": intermediate_repr["returns"],
},
),
)
).strip()
),
),
)
)
if intermediate_repr.get("doc")
else []
),
expr=None,
expr_func=None,
Expand Down
44 changes: 33 additions & 11 deletions cdd/sqlalchemy/utils/emit_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,14 @@ def param_to_sqlalchemy_column_calls(name_param, include_name):

if len(args) < 2 and (
not args
or isinstance(args[0], Name)
and args[0].id not in sqlalchemy_top_level_imports
or not (
isinstance(args[0], Name) and args[0].id in sqlalchemy_top_level_imports
)
and not (
isinstance(args[0], Call)
and isinstance(args[0].func, Name)
and args[0].func.id in sqlalchemy_top_level_imports
)
):
# A good default I guess?
args.append(Name("LargeBinary", Load(), lineno=None, col_offset=None))
Expand Down Expand Up @@ -1266,16 +1272,32 @@ def _merge_name_to_column(assign):
)
)
),
keywords=(
[]
if doc_string is None
else [
keyword(
arg="comment",
value=cdd.shared.ast_utils.set_value(doc_string),
identifier=None,
keywords=list(
chain.from_iterable(
(
(
iter(())
if doc_string is None
else (
keyword(
arg="comment",
value=cdd.shared.ast_utils.set_value(doc_string),
identifier=None,
),
)
),
(
keyword(
arg="keep_existing",
value=cdd.shared.ast_utils.set_value(True),
identifier=None,
expr=None,
lineno=None,
**cdd.shared.ast_utils.maybe_type_comment,
),
),
)
]
)
),
expr=None,
expr_func=None,
Expand Down
6 changes: 3 additions & 3 deletions cdd/sqlalchemy/utils/shared_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ def update_args_infer_typ_sqlalchemy(_param, args, name, nullable, x_typ_sql):
"""
if _param["typ"] is None:
return _param.get("default") == cdd.shared.ast_utils.NoneStr, None
if _param["typ"].startswith("Optional["):
elif _param["typ"].startswith("Optional["):
_param["typ"] = _param["typ"][len("Optional[") : -1]
nullable = True
if "Literal[" in _param["typ"]:
parsed_typ: Call = cdd.shared.ast_utils.get_value(
ast.parse(_param["typ"]).body[0]
parsed_typ: Call = cast(
Call, cdd.shared.ast_utils.get_value(ast.parse(_param["typ"]).body[0])
)
if parsed_typ.value.id != "Literal":
return nullable, parsed_typ.value
Expand Down
Loading

0 comments on commit 95da53e

Please sign in to comment.