Skip to content

Commit

Permalink
[ISSUE #20771] code review and fix edge case
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 committed Jan 20, 2023
1 parent e033ded commit e0cc074
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 35 deletions.
2 changes: 0 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ def _read_incremental(
has_slices = True
if logger.isEnabledFor(logging.DEBUG):
yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"slice:{json.dumps(_slice)}"))
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=_slice,
Expand Down Expand Up @@ -288,7 +287,6 @@ def _read_full_refresh(
for _slice in slices:
if logger.isEnabledFor(logging.DEBUG):
yield AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"slice:{json.dumps(_slice)}"))
logger.debug("Processing stream slice", extra={"slice": _slice})
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from jsonschema.exceptions import ValidationError
from unittest.mock import patch

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -542,6 +543,95 @@ def test_manifest_without_at_least_one_stream(self, construct_using_pydantic_mod
ManifestDeclarativeSource(source_config=manifest, construct_using_pydantic_models=construct_using_pydantic_models)


@patch("airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource.read")
def test_given_debug_when_read_then_set_log_level(self, declarative_source_read):
any_valid_manifest = {
"version": "version",
"definitions": {
"schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
"streams": [
{
"type": "DeclarativeStream",
"$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {
"type": "CursorPagination",
"cursor_value": "{{ response._metadata.next }}",
"page_size": 10,
},
},
"requester": {
"path": "/v3/marketing/lists",
"authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"},
"request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
{
"type": "DeclarativeStream",
"$options": {"name": "stream_with_custom_requester", "primary_key": "id", "url_base": "https://api.sendgrid.com"},
"schema_loader": {
"name": "{{ options.stream_name }}",
"file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml",
},
"retriever": {
"paginator": {
"type": "DefaultPaginator",
"page_size": 10,
"page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"},
"page_token_option": {"inject_into": "path"},
"pagination_strategy": {
"type": "CursorPagination",
"cursor_value": "{{ response._metadata.next }}",
"page_size": 10,
},
},
"requester": {
"type": "CustomRequester",
"class_name": "unit_tests.sources.declarative.external_component.SampleCustomComponent",
"path": "/v3/marketing/lists",
"custom_request_parameters": {"page_size": 10},
},
"record_selector": {"extractor": {"field_pointer": ["result"]}},
},
},
],
"check": {"type": "CheckStream", "stream_names": ["lists"]},
}
source = ManifestDeclarativeSource(source_config=any_valid_manifest, debug=True, construct_using_pydantic_models=True)

debug_logger = logging.getLogger("logger.debug")
list(source.read(debug_logger, {}, {}, {}))

assert debug_logger.isEnabledFor(logging.DEBUG)


def test_generate_schema():
schema_str = ManifestDeclarativeSource.generate_schema()
schema = json.loads(schema_str)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def _has_reached_limit(self, slices):
return False

def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int) -> Iterable[
Union[StreamReadPages, AirbyteLogMessage]]:
Union[StreamReadSlices, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
Expand All @@ -177,43 +177,54 @@ def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferre
Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping
"""
first_page = True
current_records = []
records_count = 0
at_least_one_page_in_group = False
current_page_records = []
current_slice_pages = []
current_page_request: Optional[HttpRequest] = None
current_page_response: Optional[HttpResponse] = None
first_slice = True
current_slice_pages = []

while len(current_records) < limit and (message := next(messages, None)):
if message.type == Type.LOG and message.log.message.startswith("slice:"):
if first_slice:
first_slice = False
else:
yield StreamReadSlices(pages=current_slice_pages)
current_slice_pages = []
elif first_page and message.type == Type.LOG and message.log.message.startswith("request:"):
first_page = False
request = self._create_request_from_log_message(message.log)
current_page_request = request
while records_count < limit and (message := next(messages, None)):
if self._need_to_close_page(at_least_one_page_in_group, message):
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)

if at_least_one_page_in_group and message.type == Type.LOG and message.log.message.startswith("slice:"):
yield StreamReadSlices(pages=current_slice_pages)
current_slice_pages = []
at_least_one_page_in_group = False
elif message.type == Type.LOG and message.log.message.startswith("request:"):
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")
current_slice_pages.append(StreamReadPages(request=current_page_request, response=current_page_response, records=current_records))
if not at_least_one_page_in_group:
at_least_one_page_in_group = True
current_page_request = self._create_request_from_log_message(message.log)
current_records = []
current_page_response = None
elif message.type == Type.LOG and message.log.message.startswith("response:"):
current_page_response = self._create_response_from_log_message(message.log)
elif message.type == Type.LOG:
yield message.log
elif message.type == Type.RECORD:
current_records.append(message.record.data)
current_page_records.append(message.record.data)
records_count += 1
schema_inferrer.accumulate(message.record)
else:
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")
current_slice_pages.append(StreamReadPages(request=current_page_request, response=current_page_response, records=current_records))
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)
yield StreamReadSlices(pages=current_slice_pages)

def _need_to_close_page(self, at_least_one_page_in_group, message):
return (
at_least_one_page_in_group
and message.type == Type.LOG
and (message.log.message.startswith("request:") or message.log.message.startswith("slice:"))
)

def _close_page(self, current_page_request, current_page_response, current_slice_pages, current_page_records):
if not current_page_request or not current_page_response:
raise ValueError("Every message grouping should have at least one request and response")

current_slice_pages.append(
StreamReadPages(request=current_page_request, response=current_page_response, records=current_page_records)
)
current_page_records.clear()

def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]:
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
from connector_builder.impl.default_api import DefaultApiImpl
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter, LowCodeSourceAdapterFactory
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapterFactory
from fastapi import HTTPException
from pydantic.error_wrappers import ValidationError

Expand Down Expand Up @@ -637,6 +637,10 @@ def test_read_stream_with_many_slices():
mock_source_adapter_cls = make_mock_adapter_factory(
iter(
[
slice_message(),
request_log_message(request),
response_log_message(response),
record_message("hashiras", {"name": "Muichiro Tokito"}),
slice_message(),
request_log_message(request),
response_log_message(response),
Expand All @@ -647,8 +651,6 @@ def test_read_stream_with_many_slices():
record_message("hashiras", {"name": "Obanai Iguro"}),
request_log_message(request),
response_log_message(response),
slice_message(),
record_message("hashiras", {"name": "Muichiro Tokito"}),
]
)
)
Expand All @@ -661,14 +663,16 @@ def test_read_stream_with_many_slices():
)

assert not stream_read.test_read_limit_reached
assert 2 == len(stream_read.slices)
assert len(stream_read.slices) == 2

assert len(stream_read.slices[0].pages) == 1
assert len(stream_read.slices[0].pages[0].records) == 1

assert 2 == len(stream_read.slices[0].pages)
assert 2 == len(stream_read.slices[0].pages[0].records)
assert 1 == len(stream_read.slices[0].pages[1].records)
assert len(stream_read.slices[1].pages) == 3
assert len(stream_read.slices[1].pages[0].records) == 2
assert len(stream_read.slices[1].pages[1].records) == 1
assert len(stream_read.slices[1].pages[2].records) == 0

assert 1 == len(stream_read.slices[1].pages)
assert 1 == len(stream_read.slices[1].pages[0].records)


def test_read_stream_given_maximum_number_of_slices_then_test_read_limit_reached():
Expand Down

0 comments on commit e0cc074

Please sign in to comment.