Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't update cursor for log messages and and default schema path coming from connector builder #19271

Merged
merged 4 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.8.1
Low-code: Don't update cursor for non-record messages and fix default loader for connector builder manifests

## 0.8.0
Low-code: Allow for request and response to be emitted as log messages

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,14 @@ def read_records(
stream_state,
)
for record in records_generator:
self.stream_slicer.update_cursor(stream_slice, last_record=record)
# Only record messages should be parsed to update the cursor which is indicated by the Mapping type
if isinstance(record, Mapping):
self.stream_slicer.update_cursor(stream_slice, last_record=record)
yield record
else:
last_record = self._last_records[-1] if self._last_records else None
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
if last_record and isinstance(last_record, Mapping):
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
yield from []

def stream_slices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_json_schema(self) -> Mapping[str, Any]:

try:
return self.default_loader.get_json_schema()
except FileNotFoundError:
except OSError:
# A slight hack since we don't directly have the stream name. However, when building the default filepath we assume the
# runtime options stores stream name 'name' so we'll do the same here
stream_name = self._options.get("name", "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@


def _default_file_path() -> str:
# schema files are always in "source_<connector_name>/schemas/<stream_name>.json
# the connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_
# Schema files are always in "source_<connector_name>/schemas/<stream_name>.json
# The connector's module name can be inferred by looking at the modules loaded and look for the one starting with source_
source_modules = [
k for k, v in sys.modules.items() if "source_" in k # example: ['source_exchange_rates', 'source_exchange_rates.source']
]
if not source_modules:
raise RuntimeError("Expected at least one module starting with 'source_'")
module = source_modules[0].split(".")[0]
return f"./{module}/schemas/{{{{options['name']}}}}.json"
k for k, v in sys.modules.items() if "source_" in k
] # example: ['source_exchange_rates', 'source_exchange_rates.source']
if source_modules:
module = source_modules[0].split(".")[0]
return f"./{module}/schemas/{{{{options['name']}}}}.json"

# If we are not in a source_ module, the most likely scenario is we're processing a manifest from the connector builder
# server which does not require a json schema to be defined.
return "./{{options['name']}}.json"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced the module check w/ a basic default. @sherifnada do you still need the changes r equested?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think over time we should probably just consolidate schemas into the YAML file. So this is a fine workaround.



@dataclass
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.8.0",
version="0.8.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@
import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status
import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers import DatetimeStreamSlicer
from airbyte_cdk.sources.streams.http.auth import NoAuth
from airbyte_cdk.sources.streams.http.http import HttpStream

primary_key = "pk"
records = [{"id": 1}, {"id": 2}]
request_response_logs = [
AirbyteLogMessage(level=Level.INFO, message="request:{}"),
AirbyteLogMessage(level=Level.INFO, message="response{}"),
]
config = {}


Expand Down Expand Up @@ -90,7 +95,7 @@ def test_simple_retriever_full(mock_http_stream):

assert retriever._last_response is None
assert retriever._last_records is None
assert retriever.parse_response(response, stream_state=None) == records
assert retriever.parse_response(response, stream_state={}) == records
assert retriever._last_response == response
assert retriever._last_records == records

Expand All @@ -107,6 +112,67 @@ def test_simple_retriever_full(mock_http_stream):
paginator.reset.assert_called()


@patch.object(HttpStream, "_read_pages", return_value=[*request_response_logs, *records])
def test_simple_retriever_with_request_response_logs(mock_http_stream):
requester = MagicMock()
paginator = MagicMock()
record_selector = MagicMock()
iterator = DatetimeStreamSlicer(
start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={}
)

retriever = SimpleRetriever(
name="stream_name",
primary_key=primary_key,
requester=requester,
paginator=paginator,
record_selector=record_selector,
stream_slicer=iterator,
options={},
config={},
)

actual_messages = [r for r in retriever.read_records(SyncMode.full_refresh)]
paginator.reset.assert_called()

assert isinstance(actual_messages[0], AirbyteLogMessage)
assert isinstance(actual_messages[1], AirbyteLogMessage)
assert actual_messages[2] == records[0]
assert actual_messages[3] == records[1]


@patch.object(HttpStream, "_read_pages", return_value=[])
def test_simple_retriever_with_request_response_log_last_records(mock_http_stream):
requester = MagicMock()
paginator = MagicMock()
record_selector = MagicMock()
record_selector.select_records.return_value = request_response_logs
response = requests.Response()
iterator = DatetimeStreamSlicer(
start_datetime="", end_datetime="", step="1d", cursor_field="id", datetime_format="", config={}, options={}
)

retriever = SimpleRetriever(
name="stream_name",
primary_key=primary_key,
requester=requester,
paginator=paginator,
record_selector=record_selector,
stream_slicer=iterator,
options={},
config={},
)

assert retriever._last_response is None
assert retriever._last_records is None
assert retriever.parse_response(response, stream_state={}) == request_response_logs
assert retriever._last_response == response
assert retriever._last_records == request_response_logs

[r for r in retriever.read_records(SyncMode.full_refresh)]
paginator.reset.assert_called()


@pytest.mark.parametrize(
"test_name, requester_response, expected_should_retry, expected_backoff_time",
[
Expand Down