Skip to content

Commit

Permalink
[Connector-builder server] Allow client to specify record limit and e…
Browse files Browse the repository at this point in the history
…nforce max of 1000 (#20575)
  • Loading branch information
clnoll authored Jan 11, 2023
1 parent 9d4dd48 commit 8ef2872
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from connector_builder.generated.apis.default_api_interface import initialize_router
from connector_builder.impl.default_api import DefaultApiImpl
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(
title="Connector Builder Server API",
Expand All @@ -22,4 +22,4 @@
allow_headers=["*"],
)

app.include_router(initialize_router(DefaultApiImpl()))
app.include_router(initialize_router(DefaultApiImpl(LowCodeSourceAdapter)))
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,23 @@ class StreamReadRequestBody(BaseModel):
stream: The stream of this StreamReadRequestBody.
config: The config of this StreamReadRequestBody.
state: The state of this StreamReadRequestBody [Optional].
record_limit: The record_limit of this StreamReadRequestBody [Optional].
"""

manifest: Dict[str, Any]
stream: str
config: Dict[str, Any]
state: Optional[Dict[str, Any]] = None
record_limit: Optional[int] = None

@validator("record_limit")
def record_limit_max(cls, value):
assert value <= 1000
return value

@validator("record_limit")
def record_limit_min(cls, value):
assert value >= 1
return value

StreamReadRequestBody.update_forward_refs()
34 changes: 34 additions & 0 deletions airbyte-connector-builder-server/connector_builder/impl/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Dict, Iterator, List

from airbyte_cdk.models import AirbyteMessage
from airbyte_cdk.sources.streams.http import HttpStream


class CdkAdapter(ABC):
"""
Abstract base class for the connector builder's CDK adapter.
"""

@abstractmethod
def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]:
"""
Gets a list of HTTP streams.
:param config: The user-provided configuration as specified by the source's spec.
:return: A list of `HttpStream`s.
"""

@abstractmethod
def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterator[AirbyteMessage]:
"""
Reads data from the specified stream.
:param stream: stream
:param config: The user-provided configuration as specified by the source's spec.
:return: An iterator over `AirbyteMessage` objects.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
import traceback
from json import JSONDecodeError
from typing import Any, Dict, Iterable, Optional, Union
from typing import Any, Callable, Dict, Iterable, Iterator, Optional, Union
from urllib.parse import parse_qs, urljoin, urlparse

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
Expand All @@ -22,14 +22,19 @@
from connector_builder.generated.models.streams_list_read import StreamsListRead
from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
from connector_builder.impl.adapter import CdkAdapter
from fastapi import Body, HTTPException
from jsonschema import ValidationError


class DefaultApiImpl(DefaultApi):
logger = logging.getLogger("airbyte.connector-builder")

def __init__(self, adapter_cls: Callable[[Dict[str, Any]], CdkAdapter], max_record_limit: int = 1000):
self.adapter_cls = adapter_cls
self.max_record_limit = max_record_limit
super().__init__()

async def get_manifest_template(self) -> str:
return """version: "0.1.0"
definitions:
Expand Down Expand Up @@ -107,17 +112,24 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo
Using the provided manifest and config, invokes a sync for the specified stream and returns groups of Airbyte messages
that are produced during the read operation
:param stream_read_request_body: Input parameters to trigger the read operation for a stream
:param limit: The maximum number of records requested by the client (must be within the range [1, self.max_record_limit])
:return: Airbyte record messages produced by the sync grouped by slice and page
"""
adapter = self._create_low_code_adapter(manifest=stream_read_request_body.manifest)
schema_inferrer = SchemaInferrer()

if stream_read_request_body.record_limit is None:
record_limit = self.max_record_limit
else:
record_limit = min(stream_read_request_body.record_limit, self.max_record_limit)

single_slice = StreamReadSlices(pages=[])
log_messages = []
try:
for message_group in self._get_message_groups(
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config),
schema_inferrer
schema_inferrer,
record_limit,
):
if isinstance(message_group, AirbyteLogMessage):
log_messages.append({"message": message_group.message})
Expand All @@ -132,11 +144,11 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo

return StreamRead(logs=log_messages, slices=[single_slice], inferred_schema=schema_inferrer.get_stream_schema(stream_read_request_body.stream))

def _get_message_groups(self, messages: Iterable[AirbyteMessage], schema_inferrer: SchemaInferrer) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
def _get_message_groups(self, messages: Iterator[AirbyteMessage], schema_inferrer: SchemaInferrer, limit: int) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
"""
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
and record messages belong to the prior request log message and when we encounter another request, append the latest
message group.
message group, until <limit> records have been read.
Messages received from the CDK read operation will always arrive in the following order:
{type: LOG, log: {message: "request: ..."}}
Expand All @@ -152,7 +164,8 @@ def _get_message_groups(self, messages: Iterable[AirbyteMessage], schema_inferre
current_records = []
current_page_request: Optional[HttpRequest] = None
current_page_response: Optional[HttpResponse] = None
for message in messages:

while len(current_records) < limit and (message := next(messages, None)):
if first_page and message.type == Type.LOG and message.log.message.startswith("request:"):
first_page = False
request = self._create_request_from_log_message(message.log)
Expand Down Expand Up @@ -209,10 +222,9 @@ def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> O
self.logger.warning(f"Failed to parse log message into response object with error: {error}")
return None

@staticmethod
def _create_low_code_adapter(manifest: Dict[str, Any]) -> LowCodeSourceAdapter:
def _create_low_code_adapter(self, manifest: Dict[str, Any]) -> CdkAdapter:
try:
return LowCodeSourceAdapter(manifest=manifest)
return self.adapter_cls(manifest=manifest)
except ValidationError as error:
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
raise HTTPException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Dict, Iterable, List
from typing import Any, Dict, Iterator, List

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
from airbyte_cdk.models import ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, ConfiguredAirbyteCatalog, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.yaml_declarative_source import \
ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.yaml_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.streams.http import HttpStream
from connector_builder.impl.adapter import CdkAdapter


class LowCodeSourceAdapter:

class LowCodeSourceAdapter(CdkAdapter):
def __init__(self, manifest: Dict[str, Any]):
# Request and response messages are only emitted for a sources that have debug turned on
self._source = ManifestDeclarativeSource(manifest, debug=True)
Expand All @@ -27,13 +25,15 @@ def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]:
http_streams.append(stream.retriever)
else:
raise TypeError(
f"A declarative stream should only have a retriever of type HttpStream, but received: {stream.retriever.__class__}")
f"A declarative stream should only have a retriever of type HttpStream, but received: {stream.retriever.__class__}"
)
else:
raise TypeError(
f"A declarative source should only contain streams of type DeclarativeStream, but received: {stream.__class__}")
f"A declarative source should only contain streams of type DeclarativeStream, but received: {stream.__class__}"
)
return http_streams

def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterable[AirbyteMessage]:
def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterator[AirbyteMessage]:
configured_catalog = ConfiguredAirbyteCatalog.parse_obj(
{
"streams": [
Expand Down
2 changes: 1 addition & 1 deletion airbyte-connector-builder-server/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
},
packages=find_packages(exclude=("unit_tests", "integration_tests", "docs")),
package_data={},
install_requires=["airbyte-cdk~=0.8", "fastapi", "uvicorn"],
install_requires=["airbyte-cdk~=0.15", "fastapi", "uvicorn"],
python_requires=">=3.9.11",
extras_require={
"tests": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ components:
type: object
description: The AirbyteStateMessage object to use as the starting state for this read
# $ref: "#/components/schemas/AirbyteProtocol/definitions/AirbyteStateMessage"
record_limit:
type: integer
minimum: 1
maximum: 1000
description: Number of records that will be returned to the client from the connector builder (max of 1000)
# --- Potential addition for a later phase ---
# numPages:
# type: integer
Expand Down
Loading

0 comments on commit 8ef2872

Please sign in to comment.