-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The next iteration of JSONSchema machinery
- Loading branch information
Showing
9 changed files
with
416 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
from collections.abc import Mapping, Sequence | ||
from itertools import count | ||
from typing import Optional | ||
|
||
from ...datastructures import OrderedUniqueGrouper | ||
from .definitions import RefSource, ResolvedJSONSchema | ||
from .resolver import RefMangler | ||
|
||
|
||
class IndexRefMangler(RefMangler): | ||
def __init__(self, start: int = 1, separator: str = "-"): | ||
self._start = start | ||
self._separator = separator | ||
|
||
def mangle_refs( | ||
self, | ||
defs: Mapping[str, ResolvedJSONSchema], | ||
common_ref: str, | ||
sources: Sequence[RefSource], | ||
) -> Mapping[RefSource, str]: | ||
result = {} | ||
counter = count(self._start) | ||
for source in sources: | ||
while True: | ||
idx = next(counter) | ||
mangled = self._with_index(common_ref, idx) | ||
if mangled not in defs: | ||
result[source] = mangled | ||
break | ||
|
||
return result | ||
|
||
def _with_index(self, common_ref: str, index: int) -> str: | ||
return f"{common_ref}{self._separator}{index}" | ||
|
||
|
||
class QualnameRefMangler(RefMangler): | ||
def mangle_refs( | ||
self, | ||
defs: Mapping[str, ResolvedJSONSchema], | ||
common_ref: str, | ||
sources: Sequence[RefSource], | ||
) -> Mapping[RefSource, str]: | ||
return {source: self._generate_name(source) or common_ref for source in sources} | ||
|
||
def _generate_name(self, source: RefSource) -> Optional[str]: | ||
tp = source.loc_stack.last.type | ||
return getattr(tp, "__qualname__", None) | ||
|
||
|
||
class CompoundRefMangler(RefMangler): | ||
def __init__(self, base: RefMangler, wrapper: RefMangler): | ||
self._base = base | ||
self._wrapper = wrapper | ||
|
||
def mangle_refs( | ||
self, | ||
defs: Mapping[str, ResolvedJSONSchema], | ||
common_ref: str, | ||
sources: Sequence[RefSource], | ||
) -> Mapping[RefSource, str]: | ||
mangled = self._base.mangle_refs(defs, common_ref, sources) | ||
|
||
grouper = OrderedUniqueGrouper[str, RefSource]() | ||
for source, ref in mangled.items(): | ||
grouper.add(ref, source) | ||
|
||
for ref, ref_sources in grouper.finalize().items(): | ||
if len(ref_sources) > 1: | ||
mangled = {**mangled, **self._wrapper.mangle_refs(defs, ref, ref_sources)} | ||
|
||
return mangled |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from abc import ABC, abstractmethod | ||
from collections import defaultdict | ||
from collections.abc import Mapping, MutableMapping, Sequence | ||
|
||
from adaptix import Omitted | ||
|
||
from ...datastructures import OrderedUniqueGrouper | ||
from ...provider.loc_stack_filtering import LocStack | ||
from ...provider.loc_stack_tools import format_loc_stack | ||
from .definitions import JSONSchema, RefSource, ResolvedJSONSchema | ||
from .schema_tools import replace_json_schema_ref, traverse_json_schema | ||
|
||
|
||
class JSONSchemaResolver(ABC): | ||
@abstractmethod | ||
def resolve( | ||
self, | ||
defs: MutableMapping[str, ResolvedJSONSchema], | ||
root_schemas: Sequence[JSONSchema], | ||
) -> Sequence[ResolvedJSONSchema]: | ||
... | ||
|
||
|
||
class RefGenerator(ABC): | ||
@abstractmethod | ||
def generate_ref(self, json_schema: JSONSchema, loc_stack: LocStack) -> str: | ||
... | ||
|
||
|
||
class RefMangler(ABC): | ||
@abstractmethod | ||
def mangle_refs( | ||
self, | ||
defs: Mapping[str, ResolvedJSONSchema], | ||
common_ref: str, | ||
sources: Sequence[RefSource], | ||
) -> Mapping[RefSource, str]: | ||
... | ||
|
||
|
||
class BuiltinJSONSchemaResolver(JSONSchemaResolver): | ||
def __init__(self, ref_generator: RefGenerator, ref_mangler: RefMangler): | ||
self._ref_generator = ref_generator | ||
self._ref_mangler = ref_mangler | ||
|
||
def resolve( | ||
self, | ||
defs: MutableMapping[str, ResolvedJSONSchema], | ||
root_schemas: Sequence[JSONSchema], | ||
) -> Sequence[ResolvedJSONSchema]: | ||
ref_to_sources = self._collect_ref_to_sources(root_schemas) | ||
source_determinator = self._get_source_determinator(defs, ref_to_sources) | ||
self._write_to_defs(defs, source_determinator) | ||
return [ | ||
replace_json_schema_ref(root, source_determinator) | ||
for root in root_schemas | ||
] | ||
|
||
def _collect_ref_to_sources(self, root_schemas: Sequence[JSONSchema]) -> Mapping[str, Sequence[RefSource]]: | ||
grouper = OrderedUniqueGrouper[str, RefSource[JSONSchema]]() | ||
for root in root_schemas: | ||
for schema in traverse_json_schema(root): | ||
ref_source = schema.ref | ||
if isinstance(ref_source, Omitted): | ||
continue | ||
|
||
ref = ( | ||
self._ref_generator.generate_ref(ref_source.json_schema, ref_source.loc_stack) | ||
if ref_source.value is None else | ||
ref_source.value | ||
) | ||
grouper.add(ref, ref_source) | ||
return grouper.finalize() | ||
|
||
def _get_source_determinator( | ||
self, | ||
defs: Mapping[str, ResolvedJSONSchema], | ||
ref_to_sources: Mapping[str, Sequence[RefSource]], | ||
) -> Mapping[RefSource, str]: | ||
source_determinator = {} | ||
for common_ref, sources in ref_to_sources.items(): | ||
if len(sources) == 1 and common_ref not in defs: | ||
source_determinator[sources[0]] = common_ref | ||
else: | ||
self._validate_sources(common_ref, sources) | ||
mangling_result = self._ref_mangler.mangle_refs(defs, common_ref, sources) | ||
source_determinator.update(mangling_result) | ||
self._validate_mangling(source_determinator) | ||
return source_determinator | ||
|
||
def _validate_sources(self, common_ref: str, sources: Sequence[RefSource]) -> None: | ||
pinned_sources = [source for source in sources if source.value is not None] | ||
if len(pinned_sources) > 1: | ||
pinned = ", ".join(f"`{format_loc_stack(pinned.loc_stack)}`" for pinned in pinned_sources) | ||
raise ValueError( | ||
f"Can not create consistent json schema," | ||
f" there are different sub schemas with pinned ref {common_ref!r}." | ||
f" {pinned}", | ||
) | ||
|
||
def _validate_mangling(self, source_determinator: Mapping[RefSource, str]) -> None: | ||
ref_to_sources = defaultdict(list) | ||
for source, ref in source_determinator.items(): | ||
ref_to_sources[ref].append(source) | ||
|
||
unmangled = [(ref, sources) for ref, sources in ref_to_sources.items() if len(sources) > 1] | ||
if unmangled: | ||
unmangled_desc = "; ".join( | ||
f"For ref {ref!r} at " | ||
+ ", and at ".join( | ||
f"`{format_loc_stack(source.json_schema)}`" for source in sources | ||
) | ||
for ref, sources in unmangled | ||
) | ||
raise ValueError( | ||
f"Can not create consistent json schema," | ||
f" can not mangle some refs." | ||
f" {unmangled_desc}", | ||
) | ||
|
||
def _write_to_defs( | ||
self, | ||
defs: MutableMapping[str, ResolvedJSONSchema], | ||
source_determinator: Mapping[RefSource, str], | ||
) -> None: | ||
for source, ref in source_determinator.items(): | ||
resolved_json_schema = replace_json_schema_ref(source.json_schema, source_determinator) | ||
if ref in defs and defs[ref] == resolved_json_schema: | ||
continue | ||
defs[ref] = resolved_json_schema |
Oops, something went wrong.