diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index b25d00e39b23..429ba7ed8926 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.22 +Add ability of object normalization according to it's jsonschema. + ## 0.1.21 Resolve nested schema references and move external references to single schema definitions. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index c8d2c47883fa..930caf017235 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -26,7 +26,8 @@ import copy from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple +from functools import lru_cache +from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( @@ -35,6 +36,7 @@ AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, + AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Status, @@ -45,6 +47,7 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http.http import HttpStream from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config +from airbyte_cdk.sources.utils.transform import Transformer class AbstractSource(Source, ABC): @@ -70,6 +73,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: :return: A list of the streams in this source connector. """ + # Stream name to instance map for applying output object transformation + _stream_instances: Dict[str, AirbyteStream] = {} + @property def name(self) -> str: """Source name""" @@ -101,6 +107,7 @@ def read( # TODO assert all streams exist in the connector # get the streams once in case the connector needs to make any queries to generate them stream_instances = {s.name: s for s in self.streams(config)} + self._stream_instances = stream_instances for configured_stream in catalog.streams: stream_instance = stream_instances.get(configured_stream.stream.name) if not stream_instance: @@ -227,7 +234,21 @@ def _checkpoint_state(self, stream_name, stream_state, connector_state, logger): connector_state[stream_name] = stream_state return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state)) + @lru_cache(maxsize=None) + def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[Transformer, dict]: + """ + Lookup stream's transform object and jsonschema based on stream name. + This function would be called a lot so using caching to save on costly + get_json_schema operation. + :param stream_name name of stream from catalog. + :return tuple with stream transformer object and discover json schema. + """ + stream_instance = self._stream_instances.get(stream_name) + return stream_instance.transformer, stream_instance.get_json_schema() + def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]): now_millis = int(datetime.now().timestamp()) * 1000 + transformer, schema = self._get_stream_transformer_and_schema(stream_name) + transformer.transform(data, schema) message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) return AirbyteMessage(type=MessageType.RECORD, record=message) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 491452751573..ab2d9d6daa46 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -31,6 +31,7 @@ from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader +from airbyte_cdk.sources.utils.transform import TransformConfig, Transformer def package_name_from_class(cls: object) -> str: @@ -47,6 +48,9 @@ class Stream(ABC): # Use self.logger in subclasses to log any messages logger = AirbyteLogger() # TODO use native "logging" loggers with custom handlers + # Transformer object ot perform output data transformation + transformer: Transformer = Transformer(TransformConfig.NoTransform) + @property def name(self) -> str: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py new file mode 100644 index 000000000000..78524796a2b2 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py @@ -0,0 +1,167 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +from enum import Flag, auto +from typing import Any, Callable, Dict + +from jsonschema import Draft7Validator, validators + + +class TransformConfig(Flag): + """ + Transformer class config. Configs can be combined using bitwise or operator e.g. + ``` + TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization + ``` + """ + + NoTransform = auto() + DefaultSchemaNormalization = auto() + CustomSchemaNormalization = auto() + # TODO: implement field transformation with user defined object path + FieldTransformation = auto() + + +class Transformer: + """ + Class for transforming object before output. + """ + + _custom_normalizer: Callable[[Any, Dict[str, Any]], Any] = None + + def __init__(self, config: TransformConfig): + """ + Initialize Transformer instance. + :param config Transform config that would be applied to object + """ + if TransformConfig.NoTransform in config and config != TransformConfig.NoTransform: + raise Exception("NoTransform option cannot be combined with another flags.") + self._config = config + all_validators = { + key: self.__normalize_and_validate(key, orig_validator) + for key, orig_validator in Draft7Validator.VALIDATORS.items() + # Do not validate field we do not transform for maximum performance. + if key in ["type", "array", "$ref", "properties", "items"] + } + self._normalizer = validators.create(meta_schema=Draft7Validator.META_SCHEMA, validators=all_validators) + + def register(self, normalization_callback: Callable) -> Callable: + """ + Register custom normalization callback. + :param normalization_callback function to be used for value + normalization. Should return normalized value. + :return Same callbeck, this is usefull for using register function as decorator. + """ + if TransformConfig.CustomSchemaNormalization not in self._config: + raise Exception("Please set TransformConfig.CustomSchemaNormalization config before registering custom normalizer") + self._custom_normalizer = normalization_callback + return normalization_callback + + def __normalize(self, original_item: Any, subschema: Dict[str, Any]): + """ + Applies different transform function to object's field according to config. + :param original_item original value of field. + :param subschema part of the jsonschema containing field type/format data. + :return Final field value. + """ + if TransformConfig.DefaultSchemaNormalization in self._config: + original_item = self.default_convert(original_item, subschema) + + if self._custom_normalizer: + original_item = self._custom_normalizer(original_item, subschema) + return original_item + + @staticmethod + def default_convert(original_item: Any, subschema: Dict[str, Any]) -> Any: + """ + Default transform function that is used when TransformConfig.DefaultSchemaNormalization flag set. + :param original_item original value of field. + :param subschema part of the jsonschema containing field type/format data. + :return transformed field value. + """ + target_type = subschema["type"] + if original_item is None and "null" in target_type: + return None + if isinstance(target_type, list): + target_type = [t for t in target_type if t != "null"] + if len(target_type) != 1: + return original_item + target_type = target_type[0] + try: + if target_type == "string": + return str(original_item) + elif target_type == "number": + return float(original_item) + elif target_type == "integer": + return int(original_item) + elif target_type == "boolean": + return bool(original_item) + except ValueError: + return original_item + return original_item + + def __normalize_and_validate(self, schema_key: str, original_validator: Callable): + """ + Traverse through object fields using native jsonschema validator and apply normalization function. + :param schema_key related json schema key that currently being validated/normalized. + :original_validator: native jsonschema validator callback. + """ + + def normalizator(validator_instance, val, instance, schema): + def resolve(subschema): + if "$ref" in subschema: + _, resolved = validator_instance.resolver.resolve(subschema["$ref"]) + return resolved + return subschema + + if schema_key == "type" and instance is not None: + if "object" in val: + for k, subschema in schema["properties"].items(): + if k in instance: + subschema = resolve(subschema) + instance[k] = self.__normalize(instance[k], subschema) + elif "array" in val: + subschema = schema.get("items") + subschema = resolve(subschema) + for index, item in enumerate(instance): + instance[index] = self.__normalize(item, subschema) + # Running native jsonschema traverse algorithm after field normalization is done. + yield from original_validator(validator_instance, val, instance, schema) + + return normalizator + + def transform(self, instance: Dict[str, Any], schema: Dict[str, Any]): + """ + Normalize and validate according to config. + :param instance object instance for normalization/transformation. All modification are done by modifing existent object. + :schema object's jsonschema for normalization. + """ + if TransformConfig.NoTransform in self._config: + return + normalizer = self._normalizer(schema) + for e in normalizer.iter_errors(instance): + """ + just calling normalizer.validate() would throw an exception on + first validation occurences and stop processing rest of schema. + """ + # TODO: log warning diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 8be2e3ce70e6..104c67d4b94b 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.21", + version="0.1.22", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 9a297c201549..6f62941110f3 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -34,6 +34,7 @@ from airbyte_cdk.sources import AbstractSource, Source from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.streams.http.http import HttpStream +from airbyte_cdk.sources.utils.transform import TransformConfig, Transformer class MockSource(Source): @@ -81,6 +82,7 @@ def abstract_source(mocker): class MockHttpStream(MagicMock, HttpStream): url_base = "http://example.com" path = "/dummy/path" + get_json_schema = MagicMock() def supports_incremental(self): return True @@ -92,6 +94,7 @@ def __init__(self, *args, **kvargs): class MockStream(MagicMock, Stream): page_size = None + get_json_schema = MagicMock() def __init__(self, *args, **kvargs): MagicMock.__init__(self) @@ -145,8 +148,7 @@ def test_read_catalog(source): def test_internal_config(abstract_source, catalog): streams = abstract_source.streams(None) assert len(streams) == 2 - http_stream = streams[0] - non_http_stream = streams[1] + http_stream, non_http_stream = streams assert isinstance(http_stream, HttpStream) assert not isinstance(non_http_stream, HttpStream) http_stream.read_records.return_value = [{}] * 3 @@ -216,3 +218,44 @@ def test_internal_config_limit(abstract_source, catalog): logger_info_args = [call[0][0] for call in logger_mock.info.call_args_list] read_log_record = [_l for _l in logger_info_args if _l.startswith("Read")] assert read_log_record[0].startswith(f"Read {STREAM_LIMIT} ") + + +SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}} + + +def test_source_config_no_transform(abstract_source, catalog): + logger_mock = MagicMock() + streams = abstract_source.streams(None) + http_stream, non_http_stream = streams + http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA + http_stream.read_records.return_value, non_http_stream.read_records.return_value = [[{"value": 23}] * 5] * 2 + records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] + assert len(records) == 2 * 5 + assert [r.record.data for r in records] == [{"value": 23}] * 2 * 5 + assert http_stream.get_json_schema.call_count == 1 + assert non_http_stream.get_json_schema.call_count == 1 + + +def test_source_config_transform(abstract_source, catalog): + logger_mock = MagicMock() + streams = abstract_source.streams(None) + http_stream, non_http_stream = streams + http_stream.transformer = Transformer(TransformConfig.DefaultSchemaNormalization) + non_http_stream.transformer = Transformer(TransformConfig.DefaultSchemaNormalization) + http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA + http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] + records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] + assert len(records) == 2 + assert [r.record.data for r in records] == [{"value": "23"}] * 2 + + +def test_source_config_transform_and_no_transform(abstract_source, catalog): + logger_mock = MagicMock() + streams = abstract_source.streams(None) + http_stream, non_http_stream = streams + http_stream.transformer = Transformer(TransformConfig.DefaultSchemaNormalization) + http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA + http_stream.read_records.return_value, non_http_stream.read_records.return_value = [{"value": 23}], [{"value": 23}] + records = [r for r in abstract_source.read(logger=logger_mock, config={}, catalog=catalog, state={})] + assert len(records) == 2 + assert [r.record.data for r in records] == [{"value": "23"}, {"value": 23}] diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py b/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py new file mode 100644 index 000000000000..6a565395c2d2 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_transform.py @@ -0,0 +1,196 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +import pytest +from airbyte_cdk.sources.utils.transform import TransformConfig, Transformer + +SIMPLE_SCHEMA = {"type": "object", "properties": {"value": {"type": "string"}}} +COMPLEX_SCHEMA = { + "type": "object", + "properties": { + "value": {"type": "boolean", "format": "even", "is_positive": True}, + "prop": {"type": "string"}, + "prop_with_null": {"type": ["string", "null"]}, + "number_prop": {"type": "number"}, + "int_prop": {"type": ["integer", "null"]}, + "too_many_types": {"type": ["boolean", "null", "string"]}, + "def": { + "type": "object", + "properties": {"dd": {"$ref": "#/definitions/my_type"}}, + }, + "array": {"type": "array", "items": {"$ref": "#/definitions/str_type"}}, + "nested": {"$ref": "#/definitions/nested_type"}, + "list_of_lists": { + "type": "array", + "items": {"type": "array", "items": {"type": "string"}}, + }, + }, + "definitions": { + "str_type": {"type": "string"}, + "nested_type": {"type": "object", "properties": {"a": {"type": "string"}}}, + }, +} +VERY_NESTED_SCHEMA = { + "type": ["null", "object"], + "properties": { + "very_nested_value": { + "type": ["null", "object"], + "properties": { + "very_nested_value": { + "type": ["null", "object"], + "properties": { + "very_nested_value": { + "type": ["null", "object"], + "properties": { + "very_nested_value": { + "type": ["null", "object"], + "properties": {"very_nested_value": {"type": ["null", "number"]}}, + } + }, + } + }, + } + }, + } + }, +} + + +@pytest.mark.parametrize( + "schema, actual, expected", + [ + ( + SIMPLE_SCHEMA, + {"value": 12}, + {"value": "12"}, + ), + ( + SIMPLE_SCHEMA, + {"value": 12}, + {"value": "12"}, + ), + ( + COMPLEX_SCHEMA, + {"value": 1, "array": ["111", 111, {1: 111}]}, + {"value": True, "array": ["111", "111", "{1: 111}"]}, + ), + ( + COMPLEX_SCHEMA, + {"value": 1, "list_of_lists": [["111"], [111], [11], [{1: 1}]]}, + {"value": True, "list_of_lists": [["111"], ["111"], ["11"], ["{1: 1}"]]}, + ), + ( + COMPLEX_SCHEMA, + {"value": 1, "nested": {"a": [1, 2, 3]}}, + {"value": True, "nested": {"a": "[1, 2, 3]"}}, + ), + (COMPLEX_SCHEMA, {}, {}), + (COMPLEX_SCHEMA, {"int_prop": "12"}, {"int_prop": 12}), + # Skip invalid formattted field and process other fields. + ( + COMPLEX_SCHEMA, + {"prop": 12, "number_prop": "aa12", "array": [12]}, + {"prop": "12", "number_prop": "aa12", "array": ["12"]}, + ), + # Field too_many_types have ambigious type, skip formatting + ( + COMPLEX_SCHEMA, + {"prop": 12, "too_many_types": 1212, "array": [12]}, + {"prop": "12", "too_many_types": 1212, "array": ["12"]}, + ), + # Test null field + ( + COMPLEX_SCHEMA, + {"prop": None, "array": [12]}, + {"prop": "None", "array": ["12"]}, + ), + # If field can be null do not convert + ( + COMPLEX_SCHEMA, + {"prop_with_null": None, "array": [12]}, + {"prop_with_null": None, "array": ["12"]}, + ), + ( + VERY_NESTED_SCHEMA, + {"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": "2"}}}}}, + {"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": {"very_nested_value": 2}}}}}, + ), + ( + VERY_NESTED_SCHEMA, + {"very_nested_value": {"very_nested_value": None}}, + {"very_nested_value": {"very_nested_value": None}}, + ), + ], +) +def test_transform(schema, actual, expected): + t = Transformer(TransformConfig.DefaultSchemaNormalization) + t.transform(actual, schema) + assert actual == expected + + +def test_transform_wrong_config(): + with pytest.raises(Exception, match="NoTransform option cannot be combined with another flags."): + Transformer(TransformConfig.NoTransform | TransformConfig.DefaultSchemaNormalization) + + with pytest.raises(Exception, match="Please set TransformConfig.CustomSchemaNormalization config before registering custom normalizer"): + + class NotAStream: + transformer = Transformer(TransformConfig.DefaultSchemaNormalization) + + @transformer.register + def transform_cb(instance, schema): + pass + + +def test_custom_transform(): + class NotAStream: + transformer = Transformer(TransformConfig.CustomSchemaNormalization) + + @transformer.register + def transform_cb(instance, schema): + # Check no default conversion applied + assert instance == 12 + assert schema == SIMPLE_SCHEMA["properties"]["value"] + return "transformed" + + s = NotAStream() + obj = {"value": 12} + s.transformer.transform(obj, SIMPLE_SCHEMA) + assert obj == {"value": "transformed"} + + +def test_custom_transform_with_default_normalization(): + class NotAStream: + transformer = Transformer(TransformConfig.CustomSchemaNormalization | TransformConfig.DefaultSchemaNormalization) + + @transformer.register + def transform_cb(instance, schema): + # Check default conversion applied + assert instance == "12" + assert schema == SIMPLE_SCHEMA["properties"]["value"] + return "transformed" + + s = NotAStream() + obj = {"value": 12} + s.transformer.transform(obj, SIMPLE_SCHEMA) + assert obj == {"value": "transformed"} diff --git a/docs/connector-development/cdk-python/schemas.md b/docs/connector-development/cdk-python/schemas.md index 204b303423df..1277832a92da 100644 --- a/docs/connector-development/cdk-python/schemas.md +++ b/docs/connector-development/cdk-python/schemas.md @@ -25,3 +25,64 @@ def get_json_schema(self): return schema ``` +## Schema normalization + +Complying output data to declared json schema is important because those data later could be used by destination that rely on received data types (e.g. when data is stored in SQL database and you can't put INTEGER type into CHAR column and so on). In case of minor jsonschema mistake or API change could break synchronization process even if data fetching completed successfully. + +To handle this cases CDK provides ability to perform automatic object tranformation before output it to destination controller. All streams inherited from airbyte_cdk.sources.streams.core.Stream class have transform confgiuration that able to perform operation on individual object's field value. By default it do no changes and can be reconfigured in user's streams. +### Default schema normalization +Lets say you want have output records from controller to be casted to the type described on json schema. This is how you can configure it: + +```python +from airbyte_cdk.sources.utils.transform import TransformConfig, Transformer +from airbyte_cdk.sources.streams.core import Stream + +class MyStream(Stream): + ... + transformer = Transformer(TransformConfig.DefaultSchemaNormalization) + ... +``` +In this case default transformation will be applied. For example if you have schema like this +```json +{"type": "object", "properties": {"value": {"type": "string"}}} +``` +and source API returned object with non-string type, it would be casted to string automaticaly: +```json +{"value": 12} -> {"value": "12"} +``` +Also it works on complex types: +```json +{"value": {"unexpected_object": "value"}} -> {"value": "{'unexpected_object': 'value'}"} +``` +And objects inside array of referenced by $ref attribute. + + In case if value cannot be casted (e.g. string "asdf" cannot be casted to integer) field would contain original value and no error reported. + +*Note:* This transformation is done by source, not stream itself. I.e. if you have overriden "read_records" method in your stream it wont affect object transformation. All transformation are done in-place by modifing output object before passing it to "get_updated_state" method, so "get_updated_state" would receive transformed object. + +### Custom schema normalization +Default schema normalization perform simple type casting regardless its format. Sometimes you want to perform more sofisticated transform like making "date-time" field compliant to rcf3339 standard. In this case you can use custom schema normalization: +```python +class MyStream(Stream): + ... + transformer = Transformer(TransformConfig.CustomSchemaNormalization) + ... + + @transformer.register + def transform_function(orginal_value: Any, field_schema: Dict[str, Any]) -> Any: + # transformed_value = ... + return transformed_value +``` +Where original_value is initial field value and field_schema is part of jsonschema describing field type. For schema +```json +{"type": "object", "properties": {"value": {"type": "string", "format": "date-time"}}} +``` +field_schema variable would be equal to +```json +{"type": "string", "format": "date-time"} +``` +In this case default normalization would be skipped and only custom transformation apply. If you want to run both default and custom normalization you can configure transdormer object by combining config flags: +```python +transformer = Transformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization) +``` +In this case custom normalization will be applied after default normalization function. Note that order of flags doesnt matter, default normalization will always be run before custom.