Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: Add schema inferrer class #20941

Merged
merged 16 commits into from
Jan 6, 2023
Merged
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.16.3
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
Add utility class to infer schemas from real records

## 0.16.2
Fix the naming of OAuthAuthenticator

Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from .schema_inferrer import SchemaInferrer
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
from .traced_exception import AirbyteTracedException

__all__ = ["AirbyteTracedException"]
__all__ = ["AirbyteTracedException", "SchemaInferrer"]
78 changes: 78 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/utils/schema_inferrer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, List, Optional, Union

from airbyte_cdk.models import AirbyteRecordMessage
from genson import SchemaBuilder
from genson.schema.strategies.object import Object


class NoRequiredObj(Object):
"""
This class has Object behaviour, but it does not generate "required[]" fields
every time it parses object. So we dont add unnecessary extra field.
"""

def to_schema(self):
schema = super(Object, self).to_schema()
schema["type"] = "object"
if self._properties:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these two if blocks what prevent generating required fields?

Copy link
Contributor Author

@flash1293 flash1293 Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a sense, I copied this over from https://github.com/airbytehq/airbyte/blob/master/tools/schema_generator/schema_generator/infer_schemas.py#L41

It's basically reimplementing the to_schema method without the required part: https://github.com/wolverdude/GenSON/blob/master/genson/schema/strategies/object.py#L80

However on checking the genson code I changed the logic to just call the existing to_schema logic of the object strategy and then dropping the required prop. It's doing a very small amount of additional work but it seems much cleaner, LMK what you think.

schema["properties"] = self._properties_to_schema(self._properties)
if self._pattern_properties:
schema["patternProperties"] = self._properties_to_schema(self._pattern_properties)
return schema


class NoRequiredSchemaBuilder(SchemaBuilder):
EXTRA_STRATEGIES = (NoRequiredObj,)

# This type is inferred from the genson lib, but there is no alias provided for it - creating it here for type safety
InferredSchema = Dict[str, Union[str, Any, List, List[Dict[str, Union[Any, List]]]]]


class SchemaInferrer:
"""
This class is used to infer a JSON schema which fits all the records passed into it
throughout its lifecycle via the accumulate method.

Instances of this class are stateful, meaning they build their inferred schemas
from every record passed into the accumulate method.

"""

builders: Dict[str, SchemaBuilder]
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self):
self.builders = {}
flash1293 marked this conversation as resolved.
Show resolved Hide resolved

def accumulate(self, record: AirbyteRecordMessage):
"""Uses the input record to add to the inferred schemas maintained by this object"""
stream_name = record.stream
builder = None
if stream_name not in self.builders:
builder = NoRequiredSchemaBuilder()
self.builders[stream_name] = builder
else:
builder = self.builders[stream_name]
builder.add_object(record.data)

def get_inferred_schemas(self) -> Dict[str, InferredSchema]:
"""
Returns the JSON schemas for all encountered streams inferred by inspecting all records
passed via the accumulate method
"""
schemas = {}
for stream_name, builder in self.builders.items():
schemas[stream_name] = builder.to_schema()
return schemas

def get_stream_schema(self, stream_name: str) -> Optional[InferredSchema]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original tech spec outlined the accumulate and get_inferred_schemas functions, but for context what's the use case for a single stream or how are we using this get_stream_schema() function
Also why do we no longer need the reset()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw when writing the spec i didn't account for this object needing to infer schema on multiple streams (that iface assumed a single stream). I think this iface makes more sense.

Copy link
Contributor Author

@flash1293 flash1293 Jan 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, when integrating with the API I noticed that for that use case we only need a specific stream and this API seemed nicer. The full get_inferred_schemas will still be nice for the planned integration with the read command / implementing a separate command for schema inference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped the reset because in the places where we plan to use this class it's much easier to just lose the reference to the inferrer instance and have the runtime clean it up, then create a new instance for the next usage. Happy to change that, but it doesn't seem heavy enough to explicitly keep an instance or a pool of them around.

"""
Returns the inferred JSON schema for the specified stream. Might be `None` if there were no records for the given stream name.
"""
if stream_name in self.builders:
return self.builders[stream_name].to_schema()
else:
return None
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.16.2",
version="0.16.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -51,6 +51,7 @@
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
"genson==1.2.2",
"pydantic~=1.9.2",
"python-dateutil",
"PyYAML~=5.4",
Expand Down
40 changes: 40 additions & 0 deletions airbyte-cdk/python/unit_tests/utils/test_schema_inferrer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json

from airbyte_cdk.models.airbyte_protocol import AirbyteRecordMessage
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer

NOW = 1234567


def test_deriving_schema():
flash1293 marked this conversation as resolved.
Show resolved Hide resolved
inferrer = SchemaInferrer()
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"id": 0, "field_A": 1.0, "field_B": "airbyte"}, emitted_at=NOW))
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"id": 1, "field_A": 2.0, "field_B": "abc"}, emitted_at=NOW))
assert json.dumps(inferrer.get_inferred_schemas()["my_stream"]) == '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"id": {"type": "integer"}, "field_A": {"type": "number"}, "field_B": {"type": "string"}}}'


def test_deriving_schema_refine():
inferrer = SchemaInferrer()
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW))
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"field_A": "abc"}, emitted_at=NOW))
assert json.dumps(inferrer.get_inferred_schemas()["my_stream"]) == '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": ["number", "string"]}}}'


def test_deriving_schema_multiple_streams():
inferrer = SchemaInferrer()
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW))
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream2", data={"field_A": "abc"}, emitted_at=NOW))
inferred_schemas = inferrer.get_inferred_schemas()
assert json.dumps(inferred_schemas["my_stream"]) == '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": "number"}}}'
assert json.dumps(inferred_schemas["my_stream2"]) == '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": "string"}}}'


def test_get_individual_schema():
inferrer = SchemaInferrer()
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW))
assert json.dumps(inferrer.get_stream_schema("my_stream")) == '{"$schema": "http://json-schema.org/schema#", "type": "object", "properties": {"field_A": {"type": "number"}}}'
assert inferrer.get_stream_schema("another_stream") is None