Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: Add schema inferrer class #20941

Merged
merged 16 commits into from
Jan 6, 2023
Merged
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.17.0
Add utility class to infer schemas from real records

## 0.16.3
Do not eagerly refresh access token in `SingleUseRefreshTokenOauth2Authenticator` [#20923](https://github.com/airbytehq/airbyte/pull/20923)

Expand Down
4 changes: 3 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from .schema_inferrer import SchemaInferrer
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
from .traced_exception import AirbyteTracedException

__all__ = ["AirbyteTracedException"]
__all__ = ["AirbyteTracedException", "SchemaInferrer"]
66 changes: 66 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/schema_inferrer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from collections import defaultdict
from typing import Any, Dict, List, Optional, Union

from airbyte_cdk.models import AirbyteRecordMessage
from genson import SchemaBuilder
from genson.schema.strategies.object import Object


class NoRequiredObj(Object):
"""
This class has Object behaviour, but it does not generate "required[]" fields
every time it parses object. So we dont add unnecessary extra field.
"""

def to_schema(self):
schema = super(NoRequiredObj, self).to_schema()
schema.pop("required", None)
return schema


class NoRequiredSchemaBuilder(SchemaBuilder):
EXTRA_STRATEGIES = (NoRequiredObj,)


# This type is inferred from the genson lib, but there is no alias provided for it - creating it here for type safety
InferredSchema = Dict[str, Union[str, Any, List, List[Dict[str, Union[Any, List]]]]]


class SchemaInferrer:
"""
This class is used to infer a JSON schema which fits all the records passed into it
throughout its lifecycle via the accumulate method.

Instances of this class are stateful, meaning they build their inferred schemas
from every record passed into the accumulate method.

"""

stream_to_builder: Dict[str, SchemaBuilder]

def __init__(self):
self.stream_to_builder = defaultdict(NoRequiredSchemaBuilder)

def accumulate(self, record: AirbyteRecordMessage):
"""Uses the input record to add to the inferred schemas maintained by this object"""
self.stream_to_builder[record.stream].add_object(record.data)

def get_inferred_schemas(self) -> Dict[str, InferredSchema]:
"""
Returns the JSON schemas for all encountered streams inferred by inspecting all records
passed via the accumulate method
"""
schemas = {}
for stream_name, builder in self.stream_to_builder.items():
schemas[stream_name] = builder.to_schema()
return schemas

def get_stream_schema(self, stream_name: str) -> Optional[InferredSchema]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original tech spec outlined the accumulate and get_inferred_schemas functions, but for context what's the use case for a single stream or how are we using this get_stream_schema() function
Also why do we no longer need the reset()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw when writing the spec i didn't account for this object needing to infer schema on multiple streams (that iface assumed a single stream). I think this iface makes more sense.

Copy link
Contributor Author

@flash1293 flash1293 Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, when integrating with the API I noticed that for that use case we only need a specific stream and this API seemed nicer. The full get_inferred_schemas will still be nice for the planned integration with the read command / implementing a separate command for schema inference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped the reset because in the places where we plan to use this class it's much easier to just lose the reference to the inferrer instance and have the runtime clean it up, then create a new instance for the next usage. Happy to change that, but it doesn't seem heavy enough to explicitly keep an instance or a pool of them around.

"""
Returns the inferred JSON schema for the specified stream. Might be `None` if there were no records for the given stream name.
"""
return self.stream_to_builder[stream_name].to_schema() if stream_name in self.stream_to_builder else None
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.16.3",
version="0.17.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -51,6 +51,7 @@
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
"genson==1.2.2",
"pydantic~=1.9.2",
"python-dateutil",
"PyYAML~=5.4",
Expand Down
99 changes: 99 additions & 0 deletions airbyte-cdk/python/unit_tests/utils/test_schema_inferrer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json

from airbyte_cdk.models.airbyte_protocol import AirbyteRecordMessage
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer

NOW = 1234567


def test_deriving_schema():
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
inferrer = SchemaInferrer()
inferrer.accumulate(
AirbyteRecordMessage(
stream="my_stream",
data={"id": 0, "field_A": 1.0, "field_B": "airbyte"},
emitted_at=NOW,
)
)
inferrer.accumulate(
AirbyteRecordMessage(
stream="my_stream",
data={"id": 1, "field_A": 2.0, "field_B": "abc"},
emitted_at=NOW,
)
)
assert (
json.dumps(inferrer.get_inferred_schemas()["my_stream"])
== '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"id": {"type": "integer"}, "field_A": {"type": "number"}, "field_B": {"type": "string"}}}'
)


def test_deriving_schema_refine():
inferrer = SchemaInferrer()
inferrer.accumulate(
AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW)
)
inferrer.accumulate(
AirbyteRecordMessage(
stream="my_stream", data={"field_A": "abc"}, emitted_at=NOW
)
)
assert (
json.dumps(inferrer.get_inferred_schemas()["my_stream"])
== '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": ["number", "string"]}}}'
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
)


def test_deriving_schema_nested_structures():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would recommend considering the use of pytest.mark.fixture here to parametrize tests

inferrer = SchemaInferrer()
inferrer.accumulate(
AirbyteRecordMessage(
stream="my_stream", data={"obj": {"data": [1.0, 2.0, 3.0]}}, emitted_at=NOW
)
)
inferrer.accumulate(
AirbyteRecordMessage(
stream="my_stream", data={"obj": {"other_key": "xyz"}}, emitted_at=NOW
)
)
assert (
json.dumps(inferrer.get_inferred_schemas()["my_stream"])
== '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"obj": {"type": "object", "properties": {"data": {"type": "array", "items": {"type": "number"}}, "other_key": {"type": "string"}}}}}'
)


def test_deriving_schema_multiple_streams():
inferrer = SchemaInferrer()
inferrer.accumulate(
AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW)
)
inferrer.accumulate(
AirbyteRecordMessage(
stream="my_stream2", data={"field_A": "abc"}, emitted_at=NOW
)
)
inferred_schemas = inferrer.get_inferred_schemas()
assert (
json.dumps(inferred_schemas["my_stream"])
== '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": "number"}}}'
)
assert (
json.dumps(inferred_schemas["my_stream2"])
== '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": "string"}}}'
)


def test_get_individual_schema():
inferrer = SchemaInferrer()
inferrer.accumulate(
AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW)
)
assert (
json.dumps(inferrer.get_stream_schema("my_stream"))
== '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": "number"}}}'
)
assert inferrer.get_stream_schema("another_stream") is None