Skip to content

Commit

Permalink
[low-code] replace emptySchemaLoader with DefaultSchemaLoader (#18947)
Browse files Browse the repository at this point in the history
* replace emptySchemaLoader with DefaultSchemaLoader

* fix test name

* fix test

* add logging for when we default to the empty schema

* increment patch version

* fix formatting

* update changelog
  • Loading branch information
brianjlai authored Nov 4, 2022
1 parent 605fb92 commit 186580a
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 48 deletions.
4 changes: 4 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 0.5.3
Low-code: Replace EmptySchemaLoader with DefaultSchemaLoader to retain backwards compatibility
Low-code: Evaluate backoff strategies at runtime

## 0.5.2
Low-code: Allow for read even when schemas are not defined for a connector yet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
Expand Down Expand Up @@ -48,7 +48,7 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
def __post_init__(self, options: Mapping[str, Any]):
self.stream_cursor_field = self.stream_cursor_field or []
self.transformations = self.transformations or []
self._schema_loader = self.schema_loader if self.schema_loader else EmptySchemaLoader(config=self.config, options=options)
self._schema_loader = self.schema_loader if self.schema_loader else DefaultSchemaLoader(config=self.config, options=options)

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
Expand All @@ -59,7 +58,6 @@
"DefaultErrorHandler": DefaultErrorHandler,
"DefaultPaginator": DefaultPaginator,
"DpathExtractor": DpathExtractor,
"EmptySchemaLoader": EmptySchemaLoader,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InterpolatedBoolean": InterpolatedBoolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
Expand Down Expand Up @@ -58,7 +58,7 @@
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
SchemaLoader: EmptySchemaLoader,
SchemaLoader: DefaultSchemaLoader,
Stream: DeclarativeStream,
StreamSlicer: SingleSlice,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.schema.empty_schema_loader import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader

__all__ = ["JsonFileSchemaLoader", "EmptySchemaLoader", "SchemaLoader"]
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DefaultSchemaLoader(SchemaLoader, JsonSchemaMixin):
"""
Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.
Attributes:
config (Config): The user-provided configuration as specified by the source's spec
options (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
"""

config: Config
options: InitVar[Mapping[str, Any]]

def __post_init__(self, options: Mapping[str, Any]):
self._options = options
self.default_loader = JsonFileSchemaLoader(options=options, config=self.config)

def get_json_schema(self) -> Mapping[str, Any]:
"""
Attempts to retrieve a schema from the default filepath location or returns the empty schema if a schema cannot be found.
:return: The empty schema
"""

try:
return self.default_loader.get_json_schema()
except FileNotFoundError:
# A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
# runtime options stores stream name 'name' so we'll do the same here
stream_name = self._options.get("name", "")
logging.info(f"Could not find schema for stream {stream_name}, defaulting to the empty schema")
return {}

This file was deleted.

2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.5.2",
version="0.5.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from unittest.mock import MagicMock

import pytest
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader


@pytest.mark.parametrize(
"found_schema, found_error, expected_schema",
[
pytest.param(
{"type": "object", "properties": {}}, None, {"type": "object", "properties": {}}, id="test_has_schema_in_default_location"
),
pytest.param(None, FileNotFoundError, {}, id="test_schema_file_does_not_exist"),
],
)
def test_get_json_schema(found_schema, found_error, expected_schema):
default_schema_loader = DefaultSchemaLoader({}, {})

json_file_schema_loader = MagicMock()
if found_schema:
json_file_schema_loader.get_json_schema.return_value = {"type": "object", "properties": {}}
if found_error:
json_file_schema_loader.get_json_schema.side_effect = found_error

default_schema_loader.default_loader = json_file_schema_loader

actual_schema = default_schema_loader.get_json_schema()
assert actual_schema == expected_schema
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema import EmptySchemaLoader
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand Down Expand Up @@ -603,7 +603,7 @@ def test_config_with_defaults():
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream.schema_loader) == EmptySchemaLoader
assert type(stream.schema_loader) == DefaultSchemaLoader
assert type(stream.retriever) == SimpleRetriever
assert stream.retriever.requester.http_method == HttpMethod.GET

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def test_generate_schema():

declarative_stream = schema["definitions"]["DeclarativeStream"]
assert {"retriever", "config"}.issubset(declarative_stream["required"])
assert {"$ref": "#/definitions/EmptySchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/DefaultSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever"
assert declarative_stream["properties"]["name"]["type"] == "string"
Expand Down

0 comments on commit 186580a

Please sign in to comment.