-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathobject.py
97 lines (81 loc) · 3.2 KB
/
object.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from collections import defaultdict
from re import search
from .base import SchemaStrategy
class Object(SchemaStrategy):
"""
object schema strategy
"""
KEYWORDS = ('type', 'properties', 'patternProperties', 'required')
@staticmethod
def match_schema(schema):
return schema.get('type') == 'object'
@staticmethod
def match_object(obj):
return isinstance(obj, dict)
def __init__(self, node_class):
super().__init__(node_class)
self._properties = defaultdict(node_class)
self._pattern_properties = defaultdict(node_class)
self._required = None
self._include_empty_required = False
def add_schema(self, schema):
super().add_schema(schema)
if 'properties' in schema:
for prop, subschema in schema['properties'].items():
subnode = self._properties[prop]
if subschema is not None:
subnode.add_schema(subschema)
if 'patternProperties' in schema:
for pattern, subschema in schema['patternProperties'].items():
subnode = self._pattern_properties[pattern]
if subschema is not None:
subnode.add_schema(subschema)
if 'required' in schema:
required = set(schema['required'])
if not required:
self._include_empty_required = True
if self._required is None:
self._required = required
else:
self._required &= required
def add_object(self, obj):
properties = set()
for prop, subobj in obj.items():
pattern = None
if prop not in self._properties:
pattern = self._matching_pattern(prop)
if pattern is not None:
self._pattern_properties[pattern].add_object(subobj)
else:
properties.add(prop)
self._properties[prop].add_object(subobj)
if self._required is None:
self._required = properties
else:
self._required &= properties
def _matching_pattern(self, prop):
for pattern in self._pattern_properties.keys():
if search(pattern, prop):
return pattern
def _add(self, items, func):
while len(self._items) < len(items):
self._items.append(self._schema_node_class())
for subschema, item in zip(self._items, items):
getattr(subschema, func)(item)
def to_schema(self):
schema = super().to_schema()
schema['type'] = 'object'
if self._properties:
schema['properties'] = self._properties_to_schema(
self._properties)
if self._pattern_properties:
schema['patternProperties'] = self._properties_to_schema(
self._pattern_properties)
if self._required or self._include_empty_required:
schema['required'] = sorted(self._required)
return schema
def _properties_to_schema(self, properties):
schema_properties = {}
for prop, schema_node in properties.items():
schema_properties[prop] = schema_node.to_schema()
return schema_properties