From 45006a750b5a73592620cdc7cab273739d6ea83d Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 9 Jan 2023 19:08:23 +0100 Subject: [PATCH] Connector builder server: Add inferred schema to read API response (#20942) * fix stuff * add inferred schema to API * fix yaml changes * fix yaml formatting * add whitespace back * reorder imports --- .../connector_builder/generated/models/stream_read.py | 2 ++ .../connector_builder/impl/default_api.py | 11 ++++++++--- .../src/main/openapi/openapi.yaml | 3 +++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py b/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py index 8a3b8412d7e4..163bb131e7c2 100644 --- a/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py +++ b/airbyte-connector-builder-server/connector_builder/generated/models/stream_read.py @@ -19,9 +19,11 @@ class StreamRead(BaseModel): logs: The logs of this StreamRead. slices: The slices of this StreamRead. + inferred_schema: The inferred_schema of this StreamRead [Optional]. """ logs: List[object] slices: List[StreamReadSlices] + inferred_schema: Optional[Dict[str, Any]] = None StreamRead.update_forward_refs() diff --git a/airbyte-connector-builder-server/connector_builder/impl/default_api.py b/airbyte-connector-builder-server/connector_builder/impl/default_api.py index cd0db29fb723..e80a4cd02da9 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/default_api.py +++ b/airbyte-connector-builder-server/connector_builder/impl/default_api.py @@ -10,6 +10,8 @@ from urllib.parse import parse_qs, urljoin, urlparse from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type +from airbyte_cdk.utils.schema_inferrer import SchemaInferrer + from connector_builder.generated.apis.default_api_interface import DefaultApi from connector_builder.generated.models.http_request import HttpRequest from connector_builder.generated.models.http_response import HttpResponse @@ -108,12 +110,14 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo :return: Airbyte record messages produced by the sync grouped by slice and page """ adapter = self._create_low_code_adapter(manifest=stream_read_request_body.manifest) + schema_inferrer = SchemaInferrer() single_slice = StreamReadSlices(pages=[]) log_messages = [] try: for message_group in self._get_message_groups( - adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config) + adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config), + schema_inferrer ): if isinstance(message_group, AirbyteLogMessage): log_messages.append({"message": message_group.message}) @@ -126,9 +130,9 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo detail=f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}", ) - return StreamRead(logs=log_messages, slices=[single_slice]) + return StreamRead(logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream)) - def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]: + def _get_message_groups(self, messages: Iterable[AirbyteMessage], schema_inferrer: SchemaInferrer) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]: """ Message groups are partitioned according to when request log messages are received. Subsequent response log messages and record messages belong to the prior request log message and when we encounter another request, append the latest @@ -165,6 +169,7 @@ def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Un yield message.log elif message.type == Type.RECORD: current_records.append(message.record.data) + schema_inferrer.accumulate(message.record) else: if not current_page_request or not current_page_response: raise ValueError("Every message grouping should have at least one request and response") diff --git a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml index 56ef6aa35ea6..7c2b663da3fd 100644 --- a/airbyte-connector-builder-server/src/main/openapi/openapi.yaml +++ b/airbyte-connector-builder-server/src/main/openapi/openapi.yaml @@ -123,6 +123,9 @@ components: type: object description: The STATE AirbyteMessage emitted at the end of this slice. This can be omitted if a stream slicer is not configured. # $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage" + inferred_schema: + type: object + description: The narrowest JSON Schema against which every AirbyteRecord in the slices can validate successfully. This is inferred from reading every record in the output slices. StreamReadRequestBody: type: object required: