Skip to content

Commit

Permalink
add some tests for manifests with references and some additional inva…
Browse files Browse the repository at this point in the history
…lid test cases and error handling refinement (#19528)

* add some tests for manifests with references and some additional invalid test cases and error handling refinement

* pr feedback to use str

* remove print
  • Loading branch information
brianjlai authored Nov 17, 2022
1 parent 5219a72 commit 86eb221
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
from fastapi import Body, HTTPException
from jsonschema import ValidationError


class DefaultApiImpl(DefaultApi):
Expand Down Expand Up @@ -97,7 +96,7 @@ async def list_streams(self, streams_list_request_body: StreamsListRequestBody =
)
)
except Exception as error:
raise HTTPException(status_code=400, detail=f"Could not list streams with with error: {error.args[0]}")
raise HTTPException(status_code=400, detail=f"Could not list streams with with error: {str(error)}")
return StreamsListRead(streams=stream_list_read)

async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Body(None, description="")) -> StreamRead:
Expand All @@ -121,7 +120,7 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
single_slice.pages.append(message_group)
except Exception as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(status_code=400, detail=f"Could not perform read with with error: {error.args[0]}")
raise HTTPException(status_code=400, detail=f"Could not perform read with with error: {str(error)}")

return StreamRead(logs=log_messages, slices=[single_slice])

Expand Down Expand Up @@ -199,6 +198,6 @@ def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> O
def _create_low_code_adapter(manifest: Dict[str, Any]) -> LowCodeSourceAdapter:
try:
return LowCodeSourceAdapter(manifest=manifest)
except ValidationError as error:
except Exception as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(status_code=400, detail=f"Invalid connector manifest with error: {error.message}")
raise HTTPException(status_code=400, detail=f"Invalid connector manifest with error: {str(error)}")
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def test_invalid_manifest():
)

assert actual_exception.value.status_code == expected_status_code
assert actual_exception.value.detail == expected_detail
assert expected_detail in actual_exception.value.detail


def test_read_stream_invalid_group_format():
Expand Down Expand Up @@ -380,7 +380,7 @@ def test_read_stream_invalid_group_format():

def test_read_stream_returns_error_if_stream_does_not_exist():
expected_status_code = 400
expected_detail = "Could not perform read with with error: The requested stream not_in_manifest was not found in the source. Available streams: dict_keys(['hashiras', 'breathing-techniques'])"
expected_detail = "Could not perform read with with error: \"The requested stream not_in_manifest was not found in the source. Available streams: dict_keys(['hashiras', 'breathing-techniques'])\""

api = DefaultApiImpl()
loop = asyncio.get_event_loop()
Expand All @@ -390,7 +390,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist():
)

assert actual_exception.value.status_code == expected_status_code
assert actual_exception.value.detail == expected_detail
assert expected_detail in actual_exception.value.detail


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import requests
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, Level, Type
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
from airbyte_cdk.sources.streams.http import HttpStream
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter

Expand All @@ -18,6 +19,7 @@ class MockConcreteStream(HttpStream, ABC):
"""
Test class used to verify errors are correctly thrown when the adapter receives unexpected outputs
"""

def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return None

Expand Down Expand Up @@ -94,6 +96,69 @@ def parse_response(
],
"check": {"stream_names": ["hashiras"], "class_name": "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"},
}

MANIFEST_WITH_REFERENCES = {
"version": "0.1.0",
"definitions": {
"selector": {
"extractor": {
"field_pointer": []
}
},
"requester": {
"url_base": "https://demonslayers.com/api/v1/",
"http_method": "GET",
"authenticator": {
"type": "BearerAuthenticator",
"api_token": "{{ config['api_key'] }}"
}
},
"retriever": {
"record_selector": {
"$ref": "*ref(definitions.selector)"
},
"paginator": {
"type": "NoPagination"
},
"requester": {
"$ref": "*ref(definitions.requester)"
}
},
"base_stream": {
"retriever": {
"$ref": "*ref(definitions.retriever)"
}
},
"ranks_stream": {
"$ref": "*ref(definitions.base_stream)",
"$options": {
"name": "ranks",
"primary_key": "id",
"path": "/ranks"
}
}
},
"streams": ["*ref(definitions.ranks_stream)"],
"check": {
"stream_names": ["ranks"]
},
"spec": {
"documentation_url": "https://docsurl.com",
"connection_specification": {
"title": "Source Name Spec",
"type": "object",
"required": ["api_key"],
"additionalProperties": True,
"properties": {
"api_key": {
"type": "string",
"description": "API Key"
}
}
}
}
}

INVALID_MANIFEST = {
"version": "0.1.0",
"definitions": {
Expand Down Expand Up @@ -128,6 +193,17 @@ def test_get_http_streams():
assert actual_urls == expected_urls


def test_get_http_manifest_with_references():
expected_urls = {"https://demonslayers.com/api/v1/ranks"}

adapter = LowCodeSourceAdapter(MANIFEST_WITH_REFERENCES)
actual_streams = adapter.get_http_streams(config={})
actual_urls = {http_stream.url_base + http_stream.path() for http_stream in actual_streams}

assert len(actual_streams) == len(expected_urls)
assert actual_urls == expected_urls


def test_get_http_streams_non_declarative_streams():
non_declarative_stream = MockConcreteStream()

Expand All @@ -141,7 +217,8 @@ def test_get_http_streams_non_declarative_streams():


def test_get_http_streams_non_http_stream():
declarative_stream_non_http_retriever = DeclarativeStream(name="hashiras", primary_key="id", retriever=MagicMock(), config={}, options={})
declarative_stream_non_http_retriever = DeclarativeStream(name="hashiras", primary_key="id", retriever=MagicMock(), config={},
options={})

mock_source = MagicMock()
mock_source.streams.return_value = [declarative_stream_non_http_retriever]
Expand Down Expand Up @@ -182,3 +259,31 @@ def test_read_streams():

for i, expected_message in enumerate(expected_messages):
assert actual_messages[i] == expected_message


def test_read_streams_invalid_reference():
invalid_reference_manifest = {
"version": "0.1.0",
"definitions": {
"selector": {
"extractor": {
"field_pointer": []
}
},
"ranks_stream": {
"$ref": "*ref(definitions.base_stream)",
"$options": {
"name": "ranks",
"primary_key": "id",
"path": "/ranks"
}
}
},
"streams": ["*ref(definitions.ranks_stream)"],
"check": {
"stream_names": ["ranks"]
}
}

with pytest.raises(UndefinedReferenceException):
LowCodeSourceAdapter(invalid_reference_manifest)

0 comments on commit 86eb221

Please sign in to comment.