diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py index 04790ae9e303..a5cc282d2c64 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/auth/token.py @@ -83,7 +83,7 @@ def token(self) -> str: @dataclass class BasicHttpAuthenticator(AbstractHeaderAuthenticator): """ - Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using bas64 + Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64 https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme The header is of the form diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 602176f36e9e..d7f4a0ed12c3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -4,6 +4,7 @@ from typing import Mapping, Type +from airbyte_cdk.sources.declarative.auth.oauth import DeclarativeOauth2Authenticator from airbyte_cdk.sources.declarative.auth.token import ApiKeyAuthenticator, BasicHttpAuthenticator, BearerAuthenticator from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream @@ -56,6 +57,7 @@ "ListStreamSlicer": ListStreamSlicer, "MinMaxDatetime": MinMaxDatetime, "NoPagination": NoPagination, + "OAuthAuthenticator": DeclarativeOauth2Authenticator, "OffsetIncrement": OffsetIncrement, "RecordSelector": RecordSelector, "RemoveFields": RemoveFields, diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py index 6303b05ca1f8..3119fd73bcd5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py @@ -52,7 +52,7 @@ class DeclarativeComponentFactory: If the component definition is a mapping with neither a "class_name" nor a "type" field, the factory will do a best-effort attempt at inferring the component type by looking up the parent object's constructor type hints. If the type hint is an interface present in `DEFAULT_IMPLEMENTATIONS_REGISTRY`, - then the factory will create an object of it's default implementation. + then the factory will create an object of its default implementation. If the component definition is a list, then the factory will iterate over the elements of the list, instantiate its subcomponents, and return a list of instantiated objects. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py index b9885c6e1043..31518c74849b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py @@ -15,7 +15,7 @@ class YamlParser(ConnectionDefinitionParser): """ Parses a Yaml string to a ConnectionDefinition - In addition to standard Yaml parsing, the input_string can contain refererences to values previously defined. + In addition to standard Yaml parsing, the input_string can contain references to values previously defined. This parser will dereference these values to produce a complete ConnectionDefinition. References can be defined using a *ref() string. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index ca94f57a41bd..5b15fb5fbb6a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -20,6 +20,7 @@ [ ("test_extract_from_array", ["data"], {"data": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]), ("test_extract_single_record", ["data"], {"data": {"id": 1}}, [{"id": 1}]), + ("test_extract_single_record_from_root", [], {"id": 1}, [{"id": 1}]), ("test_extract_from_root_array", [], [{"id": 1}, {"id": 2}], [{"id": 1}, {"id": 2}]), ("test_nested_field", ["data", "records"], {"data": {"records": [{"id": 1}, {"id": 2}]}}, [{"id": 1}, {"id": 2}]), ("test_field_in_config", ["{{ config['field'] }}"], {"record_array": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]), diff --git a/docs/connector-development/config-based/authentication.md b/docs/connector-development/config-based/authentication.md new file mode 100644 index 000000000000..d855a5aa7c55 --- /dev/null +++ b/docs/connector-development/config-based/authentication.md @@ -0,0 +1,73 @@ +# Authentication + +The `Authenticator` defines how to configure outgoing HTTP requests to authenticate on the API source. + +## Authenticators + +### ApiKeyAuthenticator + +The `ApiKeyAuthenticator` sets an HTTP header on outgoing requests. +The following definition will set the header "Authorization" with a value "Bearer hello": + +```yaml +authenticator: + type: "ApiKeyAuthenticator" + header: "Authorization" + token: "Bearer hello" +``` + +### BearerAuthenticator + +The `BearerAuthenticator` is a specialized `ApiKeyAuthenticator` that always sets the header "Authorization" with the value "Bearer {token}". +The following definition will set the header "Authorization" with a value "Bearer hello" + +```yaml +authenticator: + type: "BearerAuthenticator" + token: "hello" +``` + +More information on bearer authentication can be found [here](https://swagger.io/docs/specification/authentication/bearer-authentication/) + +### BasicHttpAuthenticator + +The `BasicHttpAuthenticator` set the "Authorization" header with a (USER ID/password) pair, encoded using base64 as per [RFC 7617](https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme). +The following definition will set the header "Authorization" with a value "Basic {encoded credentials}" + +```yaml +authenticator: + type: "BasicHttpAuthenticator" + username: "hello" + password: "world" +``` + +The password is optional. Authenticating with APIs using Basic HTTP and a single API key can be done as: + +```yaml +authenticator: + type: "BasicHttpAuthenticator" + username: "hello" +``` + +### OAuth + +OAuth authentication is supported through the `OAuthAuthenticator`, which requires the following parameters: + +- token_refresh_endpoint: The endpoint to refresh the access token +- client_id: The client id +- client_secret: The client secret +- refresh_token: The token used to refresh the access token +- scopes (Optional): The scopes to request. Default: Empty list +- token_expiry_date (Optional): The access token expiration date formatted as RFC-3339 ("%Y-%m-%dT%H:%M:%S.%f%z") +- access_token_name (Optional): The field to extract access token from in the response. Default: "access_token". +- expires_in_name (Optional): The field to extract expires_in from in the response. Default: "expires_in" +- refresh_request_body (Optional): The request body to send in the refresh request. Default: None + +```yaml +authenticator: + type: "OAuthAuthenticator" + token_refresh_endpoint: "https://api.searchmetrics.com/v4/token" + client_id: "{{ config['api_key'] }}" + client_secret: "{{ config['client_secret'] }}" + refresh_token: "" +``` \ No newline at end of file diff --git a/docs/connector-development/config-based/error-handling.md b/docs/connector-development/config-based/error-handling.md new file mode 100644 index 000000000000..921f39b872b5 --- /dev/null +++ b/docs/connector-development/config-based/error-handling.md @@ -0,0 +1,177 @@ +# Error handling + +By default, only server errors (HTTP 5XX) and too many requests (HTTP 429) will be retried up to 5 times with exponential backoff. +Other HTTP errors will result in a failed read. + +Other behaviors can be configured through the `Requester`'s `error_handler` field. + +## Defining errors + +### From status code + +Response filters can be used to define how to handle requests resulting in responses with a specific HTTP status code. +For instance, this example will configure the handler to also retry responses with 404 error: + +```yaml +requester: + <...> + error_handler: + response_filters: + - http_codes: [ 404 ] + action: RETRY +``` + +Response filters can be used to specify HTTP errors to ignore. +For instance, this example will configure the handler to ignore responses with 404 error: + +```yaml +requester: + <...> + error_handler: + response_filters: + - http_codes: [ 404 ] + action: IGNORE +``` + +### From error message + +Errors can also be defined by parsing the error message. +For instance, this error handler will ignores responses if the error message contains the string "ignorethisresponse" + +```yaml +requester: + <...> + error_handler: + response_filters: + - error_message_contain: "ignorethisresponse" + action: IGNORE +``` + +This can also be done through a more generic string interpolation strategy with the following parameters: + +- response: the decoded response + +This example ignores errors where the response contains a "code" field: + +```yaml +requester: + <...> + error_handler: + response_filters: + - predicate: "{{ 'code' in response }}" + action: IGNORE +``` + +The error handler can have multiple response filters. +The following example is configured to ignore 404 errors, and retry 429 errors: + +```yaml +requester: + <...> + error_handler: + response_filters: + - http_codes: [ 404 ] + action: IGNORE + - http_codes: [ 429 ] + action: RETRY +``` + +## Backoff Strategies + +The error handler supports a few backoff strategies, which are described in the following sections. + +### Exponential backoff + +This is the default backoff strategy. The requester will backoff with an exponential backoff interval + +### Constant Backoff + +When using the `ConstantBackoffStrategy`, the requester will backoff with a constant interval. + +### Wait time defined in header + +When using the `WaitTimeFromHeaderBackoffStrategy`, the requester will backoff by an interval specified in the response header. +In this example, the requester will backoff by the response's "wait_time" header value: + +```yaml +requester: + <...> + error_handler: + <...> + backoff_strategies: + - type: "WaitTimeFromHeaderBackoffStrategy" + header: "wait_time" +``` + +Optionally, a regular expression can be configured to extract the wait time from the header value. + +```yaml +requester: + <...> + error_handler: + <...> + backoff_strategies: + - type: "WaitTimeFromHeaderBackoffStrategy" + header: "wait_time" + regex: "[-+]?\d+" +``` + +### Wait until time defined in header + +When using the `WaitUntilTimeFromHeaderBackoffStrategy`, the requester will backoff until the time specified in the response header. +In this example, the requester will wait until the time specified in the "wait_until" header value: + +```yaml +requester: + <...> + error_handler: + <...> + backoff_strategies: + - type: "WaitUntilTimeFromHeaderBackoffStrategy" + header: "wait_until" + regex: "[-+]?\d+" + min_wait: 5 +``` + +The strategy accepts an optional regular expression to extract the time from the header value, and a minimum time to wait. + +## Advanced error handling + +The error handler can have multiple backoff strategies, allowing it to fallback if a strategy cannot be evaluated. +For instance, the following defines an error handler that will read the backoff time from a header, and default to a constant backoff if the wait time could not be extracted from the response: + +```yaml +requester: + <...> + error_handler: + <...> + backoff_strategies: + - type: "WaitTimeFromHeaderBackoffStrategy" + header: "wait_time" + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 5 + +``` + +The `requester` can be configured to use a `CompositeErrorHandler`, which sequentially iterates over a list of error handlers, enabling different retry mechanisms for different types of errors. + +In this example, a constant backoff of 5 seconds, will be applied if the response contains a "code" field, and an exponential backoff will be applied if the error code is 403: + +```yaml +requester: + <...> + error_handler: + type: "CompositeErrorHandler" + error_handlers: + - response_filters: + - predicate: "{{ 'code' in response }}" + action: RETRY + backoff_strategies: + - type: "ConstantBackoffStrategy" + backoff_time_in_seconds: 5 + - response_filters: + - http_codes: [ 403 ] + action: RETRY + backoff_strategies: + - type: "ExponentialBackoffStrategy" +``` \ No newline at end of file diff --git a/docs/connector-development/config-based/index.md b/docs/connector-development/config-based/index.md new file mode 100644 index 000000000000..02d4703ca11b --- /dev/null +++ b/docs/connector-development/config-based/index.md @@ -0,0 +1,26 @@ +# Index + +## From scratch + +- [Overview](overview.md) +- [Yaml structure](overview.md) +- [Reference docs](https://airbyte-cdk.readthedocs.io/en/latest/api/airbyte_cdk.sources.declarative.html) + +## Concepts + +- [Authentication](authentication.md) +- [Error handling](error-handling.md) +- [Pagination](pagination.md) +- [Record selection](record-selector.md) +- [Request options](request-options.md) +- [Stream slicers](stream-slicers.md) + +## Tutorial + +0. [Getting started](tutorial/0-getting-started.md) +1. [Creating a source](tutorial/1-create-source.md) +2. [Installing dependencies](tutorial/2-install-dependencies.md) +3. [Connecting to the API](tutorial/3-connecting-to-the-API-source.md) +4. [Reading data](tutorial/4-reading-data.md) +5. [Incremental reads](tutorial/5-incremental-reads.md) +6. [Testing](tutorial/6-testing.md) \ No newline at end of file diff --git a/docs/connector-development/config-based/overview.md b/docs/connector-development/config-based/overview.md new file mode 100644 index 000000000000..0a9350d43c05 --- /dev/null +++ b/docs/connector-development/config-based/overview.md @@ -0,0 +1,154 @@ +# Config-based connectors overview + +:warning: This framework is in alpha stage. Support is not in production and is available only to select users. :warning: + +The goal of this document is to give enough technical specifics to understand how config-based connectors work. +When you're ready to start building a connector, you can start with [the tutorial](../../../config-based/tutorial/0-getting-started.md) or dive into the [reference documentation](https://airbyte-cdk.readthedocs.io/en/latest/api/airbyte_cdk.sources.declarative.html) + +## Overview + +The CDK's config-based interface uses a declarative approach to building source connectors for REST APIs. + +Config-based connectors work by parsing a YAML configuration describing the Source, then running the configured connector using a Python backend. + +The process then submits HTTP requests to the API endpoint, and extracts records out of the response. + +See the [connector definition section](yaml-structure.md) for more information on the YAML file describing the connector. + +## Supported features + +| Feature | Support | +|--------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Transport protocol | HTTP | +| HTTP methods | GET, POST | +| Data format | Json | +| Resource type | Collections
Sub-collection | +| [Pagination](./pagination.md) | [Page limit](./pagination.md#page-increment)
[Offset](./pagination.md#offset-increment)
[Cursor](./pagination.md#cursor) | +| [Authentication](./authentication.md) | [Header based](./authentication.md#ApiKeyAuthenticator)
[Bearer](./authentication.md#BearerAuthenticator)
[Basic](./authentication.md#BasicHttpAuthenticator)
[OAuth](./authentication.md#OAuth) | +| Sync mode | Full refresh
Incremental | +| Schema discovery | Only static schemas | +| [Stream slicing](./stream-slicers.md) | [Datetime](./stream-slicers.md#Datetime), [lists](./stream-slicers.md#list-stream-slicer), [parent-resource id](./stream-slicers.md#Substream-slicer) | +| [Record transformation](./record-selector.md) | [Field selection](./record-selector.md#selecting-a-field)
[Adding fields](./record-selector.md#adding-fields)
[Removing fields](./record-selector.md#removing-fields)
[Filtering records](./record-selector.md#filtering-records) | +| [Error detection](./error-handling.md) | [From HTTP status code](./error-handling.md#from-status-code)
[From error message](./error-handling.md#from-error-message) | +| [Backoff strategies](./error-handling.md#Backoff-Strategies) | [Exponential](./error-handling.md#Exponential-backoff)
[Constant](./error-handling.md#Constant-Backoff)
[Derived from headers](./error-handling.md#Wait-time-defined-in-header) | + +If a feature you require is not supported, you can [request the feature](../../contributing-to-airbyte/README.md#requesting-new-features) and use the [Python CDK](../cdk-python/README.md). + +## Source + +Config-based connectors are a declarative way to define HTTP API sources. + +A source is defined by 2 components: + +1. The source's `Stream`s, which define the data to read +2. A `ConnectionChecker`, which describes how to run the `check` operation to test the connection to the API source + +## Stream + +Streams define the schema of the data to sync, as well as how to read it from the underlying API source. +A stream generally corresponds to a resource within the API. They are analogous to tables for a relational database source. + +A stream is defined by: + +1. A name +2. Primary key (Optional): Used to uniquely identify records, enabling deduplication. Can be a string for single primary keys, a list of strings for composite primary keys, or a list of list of strings for composite primary keys consisting of nested fields. +3. [Schema](../cdk-python/schemas.md): Describes the data to sync +4. [Data retriever](overview.md#data-retriever): Describes how to retrieve the data from the API +5. [Cursor field](../cdk-python/incremental-stream.md) (Optional): Field to use used as stream cursor. Can either be a string, or a list of strings if the cursor is a nested field. +6. [Transformations](./record-selector.md#transformations) (Optional): A set of transformations to be applied on the records read from the source before emitting them to the destination +7. [Checkpoint interval](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#state--checkpointing) (Optional): Defines the interval, in number of records, at which incremental syncs should be checkpointed. + +More details on streams and sources can be found in the [basic concepts section](../cdk-python/basic-concepts.md). + +## Data retriever + +The data retriever defines how to read the data for a Stream, and acts as an orchestrator for the data retrieval flow. +There is currently only one implementation, the `SimpleRetriever`, which is defined by + +1. Requester: Describes how to submit requests to the API source +2. Paginator: Describes how to navigate through the API's pages +3. Record selector: Describes how to extract records from an HTTP response +4. Stream Slicer: Describes how to partition the stream, enabling incremental syncs and checkpointing + +Each of those components (and their subcomponents) are defined by an explicit interface and one or many implementations. +The developer can choose and configure the implementation they need depending on specifications of the integration they are building against. + +Since the `Retriever` is defined as part of the Stream configuration, different Streams for a given Source can use different `Retriever` definitions if needed. + +### Data flow + +The retriever acts as a coordinator, moving the data between its components before emitting `AirbyteMessage`s that can be read by the platform. +The `SimpleRetriever`'s data flow can be described as follows: + +1. Given the connection config and the current stream state, the `StreamSlicer` computes the stream slices to read. +2. Iterate over all the stream slices defined by the stream slicer. +3. For each stream slice, + 1. Submit a request as defined by the requester + 2. Select the records from the response + 3. Repeat for as long as the paginator points to a next page + +More details on the record selector can be found in the [record selector section](record-selector.md) +More details on the stream slicers can be found in the [stream slicers section](stream-slicers.md) +More details on the paginator can be found in the [pagination section](pagination.md) + +## Requester + +The `Requester` defines how to prepare HTTP requests to send to the source API. +There is currently only one implementation, the `HttpRequester`, which is defined by + +1. A base url: The root of the API source +2. A path: The specific endpoint to fetch data from for a resource +3. The HTTP method: the HTTP method to use (GET or POST) +4. A request options provider: Defines the request parameters (query parameters), headers, and request body to set on outgoing HTTP requests +5. An authenticator: Defines how to authenticate to the source +6. An error handler: Defines how to handle errors + +More details on authentication can be found in the [authentication section](authentication.md). +More details on error handling can be found in the [error handling section](error-handling.md) + +## Connection Checker + +The `ConnectionChecker` defines how to test the connection to the integration. + +The only implementation as of now is `CheckStream`, which tries to read a record from a specified list of streams and fails if no records could be read. + +## Custom components + +Any builtin components can be overloaded by a custom Python class. +To create a custom component, define a new class in a new file in the connector's module. +The class must implement the interface of the component it is replacing. For instance, a pagination strategy must implement `airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy.PaginationStrategy`. +The class must also be a dataclass where each field represents an argument to configure from the yaml file, and an `InitVar` named options. + +For example: + +``` +@dataclass +class MyPaginationStrategy(PaginationStrategy): + my_field: Union[InterpolatedString, str] + options: InitVar[Mapping[str, Any]] + + def __post_init__(self, options: Mapping[str, Any]): + pass + + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]: + pass + + def reset(self): + pass +``` + +This class can then be referred from the yaml file using its fully qualified class name: + +```yaml +pagination_strategy: + class_name: "my_connector_module.MyPaginationStrategy" + my_field: "hello world" +``` + +## Sample connectors + +The following connectors can serve as example of what production-ready config-based connectors look like + +- [Greenhouse](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-greenhouse) +- [Sendgrid](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sendgrid) +- [Sentry](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sentry) diff --git a/docs/connector-development/config-based/pagination.md b/docs/connector-development/config-based/pagination.md new file mode 100644 index 000000000000..b305fddaa9cc --- /dev/null +++ b/docs/connector-development/config-based/pagination.md @@ -0,0 +1,125 @@ +# Pagination + +Given a page size and a pagination strategy, the `LimitPaginator` will point to pages of results for as long as its strategy returns a `next_page_token`. + +Iterating over pages of result is different from iterating over stream slices. +Stream slices have semantic value, for instance, a Datetime stream slice defines data for a specific date range. Two stream slices will have data for different date ranges. +Conversely, pages don't have semantic value. More pages simply means that more records are to be read, without specifying any meaningful difference between the records of the first and later pages. + +The paginator is defined by + +- `page_size`: The number of records to fetch in a single request +- `limit_option`: How to specify the page size in the outgoing HTTP request +- `pagination_strategy`: How to compute the next page to fetch +- `page_token_option`: How to specify the next page to fetch in the outgoing HTTP request + +3 pagination strategies are supported + +1. Page increment +2. Offset increment +3. Cursor-based + +## Pagination Strategies + +### Page increment + +When using the `PageIncrement` strategy, the page number will be set as part of the `page_token_option`. + +The following paginator example will fetch 5 records per page, and specify the page number as a request_parameter: + +```yaml +paginator: + type: "LimitPaginator" + page_size: 5 + limit_option: + option_type: request_parameter + field_name: page_size + pagination_strategy: + type: "PageIncrement" + page_token: + option_type: "request_parameter" + field_name: "page" +``` + +If the page contains less than 5 records, then the paginator knows there are no more pages to fetch. +If the API returns more records than requested, all records will be processed. + +Assuming the endpoint to fetch data from is `https://cloud.airbyte.com/api/get_data`, +the first request will be sent as `https://cloud.airbyte.com/api/get_data?page_size=5&page=0` +and the second request as `https://cloud.airbyte.com/api/get_data?page_size=5&page=1`, + +### Offset increment + +When using the `OffsetIncrement` strategy, the number of records read will be set as part of the `page_token_option`. + +The following paginator example will fetch 5 records per page, and specify the offset as a request_parameter: + +```yaml +paginator: + type: "LimitPaginator" + page_size: 5 + limit_option: + option_type: request_parameter + field_name: page_size + pagination_strategy: + type: "OffsetIncrement" + page_token: + field_name: "offset" + inject_into: "request_parameter" + +``` + +Assuming the endpoint to fetch data from is `https://cloud.airbyte.com/api/get_data`, +the first request will be sent as `https://cloud.airbyte.com/api/get_data?page_size=5&offset=0` +and the second request as `https://cloud.airbyte.com/api/get_data?page_size=5&offset=5`, + +### Cursor + +The `CursorPaginationStrategy` outputs a token by evaluating its `cursor_value` string with the following parameters: + +- `response`: The decoded response +- `headers`: HTTP headers on the response +- `last_records`: List of records selected from the last response + +This cursor value can be used to request the next page of record. + +#### Cursor paginator in request parameters + +In this example, the next page of record is defined by setting the `from` request parameter to the id of the last record read: + +```yaml +paginator: + type: "LimitPaginator" + <...> + pagination_strategy: + type: "CursorPaginationStrategy" + cursor_value: "{{ last_records[-1]['id'] }}" + page_token: + field_name: "from" + inject_into: "request_parameter" +``` + +Assuming the endpoint to fetch data from is `https://cloud.airbyte.com/api/get_data`, +the first request will be sent as `https://cloud.airbyte.com/api/get_data` +Assuming the id of the last record fetched is 1000, +the next request will be sent as `https://cloud.airbyte.com/api/get_data?from=1000` + +#### Cursor paginator in path + +Some APIs directly point to the URL of the next page to fetch. In this example, the URL of the next page is extracted from the response headers: + +```yaml +paginator: + type: "LimitPaginator" + <...> + pagination_strategy: + type: "CursorPaginationStrategy" + cursor_value: "{{ headers['urls']['next'] }}" + page_token: + inject_into: "path" +``` + +Assuming the endpoint to fetch data from is `https://cloud.airbyte.com/api/get_data`, +the first request will be sent as `https://cloud.airbyte.com/api/get_data` +Assuming the response's next url is `https://cloud.airbyte.com/api/get_data?page=1&page_size=100`, +the next request will be sent as `https://cloud.airbyte.com/api/get_data?page=1&page_size=100` \ No newline at end of file diff --git a/docs/connector-development/config-based/record-selector.md b/docs/connector-development/config-based/record-selector.md new file mode 100644 index 000000000000..bcaee21e6cdb --- /dev/null +++ b/docs/connector-development/config-based/record-selector.md @@ -0,0 +1,254 @@ +# Record selector + +The record selector is responsible for translating an HTTP response into a list of Airbyte records by extracting records from the response and optionally filtering and shaping records based on a heuristic. + +The current record extraction implementation uses [dpath](https://pypi.org/project/dpath/) to select records from the json-decoded HTTP response. + +## Common recipes: + +Here are some common patterns: + +### Selecting the whole response + +If the root of the response is an array containing the records, the records can be extracted using the following definition: + +```yaml +selector: + extractor: + field_pointer: [ ] +``` + +If the root of the response is a json object representing a single record, the record can be extracted and wrapped in an array. + +For example, given a response body of the form + +```json +{ + "id": 1 +} +``` + +and a selector + +```yaml +selector: + extractor: + field_pointer: [ ] +``` + +The selected records will be + +```json +[ + { + "id": 1 + } +] +``` + +### Selecting a field + +Given a response body of the form + +``` +{ + "data": [{"id": 0}, {"id": 1}], + "metadata": {"api-version": "1.0.0"} +} +``` + +and a selector + +```yaml +selector: + extractor: + field_pointer: [ "data" ] +``` + +The selected records will be + +```json +[ + { + "id": 0 + }, + { + "id": 1 + } +] +``` + +### Selecting an inner field + +Given a response body of the form + +```json +{ + "data": { + "records": [ + { + "id": 1 + }, + { + "id": 2 + } + ] + } +} +``` + +and a selector + +```yaml +selector: + extractor: + field_pointer: + - "data" + - "records" +``` + +The selected records will be + +```json +[ + { + "id": 1 + }, + { + "id": 2 + } +] +``` + +## Filtering records + +Records can be filtered by adding a record_filter to the selector. +The expression in the filter will be evaluated to a boolean returning true the record should be included. + +In this example, all records with a `created_at` field greater than the stream slice's `start_time` will be filtered out: + +```yaml +selector: + extractor: + field_pointer: [ ] + record_filter: + condition: "{{ record['created_at'] < stream_slice['start_time'] }}" +``` + +## Transformations + +Fields can be added or removed from records by adding `Transformation`s to a stream's definition. + +### Adding fields + +Fields can be added with the `AddFields` transformation. +This example adds a top-level field "field1" with a value "static_value" + +```yaml +stream: + <...> + transformations: + - type: AddFields + fields: + - path: [ "field1" ] + value: "static_value" +``` + +This example adds a top-level field "start_date", whose value is evaluated from the stream slice: + +```yaml +stream: + <...> + transformations: + - type: AddFields + fields: + - path: [ "start_date" ] + value: {{ stream_slice[ 'start_date' ] }} +``` + +Fields can also be added in a nested object by writing the fields' path as a list. + +Given a record of the following shape: + +``` +{ + "id": 0, + "data": + { + "field0": "some_data" + } +} +``` + +this definition will add a field in the "data" nested object: + +```yaml +stream: + <...> + transformations: + - type: AddFields + fields: + - path: [ "data", "field1" ] + value: "static_value" +``` + +resulting in the following record: + +``` +{ + "id": 0, + "data": + { + "field0": "some_data", + "field1": "static_value" + } +} +``` + +### Removing fields + +Fields can be removed from records with the `RemoveFields` transformation. + +Given a record of the following shape: + +``` +{ + "path": + { + "to": + { + "field1": "data_to_remove", + "field2": "data_to_keep" + } + }, + "path2": "data_to_remove", + "path3": "data_to_keep" +} +``` + +this definition will remove the 2 instances of "data_to_remove" which are found in "path2" and "path.to.field1": + +```yaml +the_stream: + <...> + transformations: + - type: RemoveFields + field_pointers: + - [ "path", "to", "field1" ] + - [ "path2" ] +``` + +resulting in the following record: + +``` +{ + "path": + { + "to": + { + "field2": "data_to_keep" + } + }, + "path3": "data_to_keep" +} +``` \ No newline at end of file diff --git a/docs/connector-development/config-based/request-options.md b/docs/connector-development/config-based/request-options.md new file mode 100644 index 000000000000..daf6a5069a80 --- /dev/null +++ b/docs/connector-development/config-based/request-options.md @@ -0,0 +1,88 @@ +# Request Options + +There are a few ways to set request parameters, headers, and body on ongoing HTTP requests. + +## Request Options Provider + +The primary way to set request options is through the `Requester`'s `RequestOptionsProvider`. +The options can be configured as key value pairs: + +```yaml +requester: + type: HttpRequester + name: "{{ options['name'] }}" + url_base: "https://api.exchangeratesapi.io/v1/" + http_method: "GET" + request_options_provider: + request_parameters: + k1: v1 + k2: v2 + request_headers: + header_key1: header_value1 + header_key2: header_value2 +``` + +It is also possible to configure add a json-encoded body to outgoing requests. + +```yaml +requester: + type: HttpRequester + name: "{{ options['name'] }}" + url_base: "https://api.exchangeratesapi.io/v1/" + http_method: "GET" + request_options_provider: + request_body_json: + key: value +``` + +## Authenticators + +It is also possible for authenticators to set request parameters or headers as needed. +For instance, the `BearerAuthenticator` will always set the `Authorization` header. + +More details on the various authenticators can be found in the [authentication section](authentication.md) + +## Paginators + +The `LimitPaginator` can optionally set request options through the `limit_option` and the `page_token_option`. +The respective values can be set on the outgoing HTTP requests by specifying where it should be injected. + +The following example will set the "page" request parameter value to the page to fetch, and the "page_size" request parameter to 5: + +```yaml +paginator: + type: "LimitPaginator" + page_size: 5 + limit_option: + option_type: request_parameter + field_name: page_size + pagination_strategy: + type: "PageIncrement" + page_token: + option_type: "request_parameter" + field_name: "page" +``` + +More details on paginators can be found in the [pagination section](pagination.md) + +## Stream slicers + +The `DatetimeStreamSlicer` can optionally set request options through the `start_time_option` and `end_time_option` fields. +The respective values can be set on the outgoing HTTP requests by specifying where it should be injected. + +The following example will set the "created[gte]" request parameter value to the start of the time window, and "created[lte]" to the end of the time window. + +```yaml +stream_slicer: + start_datetime: "2021-02-01T00:00:00.000000+0000", + end_datetime: "2021-03-01T00:00:00.000000+0000", + step: "1d" + start_time_option: + field_name: "created[gte]" + inject_into: "request_parameter" + end_time_option: + field_name: "created[lte]" + inject_into: "request_parameter" +``` + +More details on the stream slicers can be found in the [stream-slicers section](stream-slicers.md) diff --git a/docs/connector-development/config-based/stream-slicers.md b/docs/connector-development/config-based/stream-slicers.md new file mode 100644 index 000000000000..96f0a5bd0a7e --- /dev/null +++ b/docs/connector-development/config-based/stream-slicers.md @@ -0,0 +1,172 @@ +# Stream Slicers + +`StreamSlicer`s define how to partition a stream into a subset of records. + +It can be thought of as an iterator over the stream's data, where a `StreamSlice` is the retriever's unit of work. + +When a stream is read incrementally, a state message will be output by the connector after reading every slice, which allows for [checkpointing](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#state--checkpointing). + +At the beginning of a `read` operation, the `StreamSlicer` will compute the slices to sync given the connection config and the stream's current state, +As the `Retriever` reads data from the `Source`, the `StreamSlicer` keeps track of the `Stream`'s state, which will be emitted after reading each stream slice. + +More information of stream slicing can be found in the [stream-slices section](../cdk-python/stream-slices.md) + +## Implementations + +This section gives an overview of the stream slicers currently implemented. + +### Datetime + +The `DatetimeStreamSlicer` iterates over a datetime range by partitioning it into time windows. +This is done by slicing the stream on the records' cursor value, defined by the Stream's `cursor_field`. + +Given a start time, an end time, and a step function, it will partition the interval [start, end] into small windows of the size described by the step. +For instance, + +```yaml +stream_slicer: + start_datetime: "2021-02-01T00:00:00.000000+0000", + end_datetime: "2021-03-01T00:00:00.000000+0000", + step: "1d" +``` + +will create one slice per day for the interval `2021-02-01` - `2021-03-01`. + +The `DatetimeStreamSlicer` also supports an optional lookback window, specifying how many days before the start_datetime to read data for. + +```yaml +stream_slicer: + start_datetime: "2021-02-01T00:00:00.000000+0000", + end_datetime: "2021-03-01T00:00:00.000000+0000", + lookback_window: "31d" + step: "1d" +``` + +will read data from `2021-01-01` to `2021-03-01`. + +The stream slices will be of the form `{"start_date": "2021-02-01T00:00:00.000000+0000", "end_date": "2021-02-01T00:00:00.000000+0000"}` +The stream slices' field names can be customized through the `stream_state_field_start` and `stream_state_field_end` parameters. + +The `datetime_format` can be used to specify the format of the start and end time. It is [RFC3339](https://datatracker.ietf.org/doc/html/rfc3339#section-5.6) by default. + +The Stream's state will be derived by reading the record's `cursor_field`. +If the `cursor_field` is `created`, and the record is `{"id": 1234, "created": "2021-02-02T00:00:00.000000+0000"}`, then the state after reading that record is `"created": "2021-02-02T00:00:00.000000+0000"`. [^1] + +#### Cursor update + +When reading data from the source, the cursor value will be updated to the max datetime between + +- the last record's cursor field +- the start of the stream slice +- the current cursor value + This ensures that the cursor will be updated even if a stream slice does not contain any data. + +#### Stream slicer on dates + +If an API supports filtering data based on the cursor field, the `start_time_option` and `end_time_option` parameters can be used to configure this filtering. +For instance, if the API supports filtering using the request parameters `created[gte]` and `created[lte]`, then the stream slicer can specify the request parameters as + +```yaml +stream_slicer: + type: "DatetimeStreamSlicer" + <...> + start_time_option: + field_name: "created[gte]" + inject_into: "request_parameter" + end_time_option: + field_name: "created[lte]" + inject_into: "request_parameter" +``` + +### List stream slicer + +`ListStreamSlicer` iterates over values from a given list. +It is defined by + +- The slice values, which are the valid values for the cursor field +- The cursor field on a record +- request_option: optional request option to set on outgoing request parameters + +As an example, this stream slicer will iterate over the 2 repositories ("airbyte" and "airbyte-secret") and will set a request_parameter on outgoing HTTP requests. + +```yaml +stream_slicer: + type: "ListStreamSlicer" + slice_values: + - "airbyte" + - "airbyte-secret" + cursor_field: "repository" + request_option: + field_name: "repository" + inject_into: "request_parameter" +``` + +### Cartesian Product stream slicer + +`CartesianProductStreamSlicer` iterates over the cartesian product of its underlying stream slicers. + +Given 2 stream slicers with the following slices: +A: `[{"start_date": "2021-01-01", "end_date": "2021-01-01"}, {"start_date": "2021-01-02", "end_date": "2021-01-02"}]` +B: `[{"s": "hello"}, {"s": "world"}]` +the resulting stream slices are + +``` +[ + {"start_date": "2021-01-01", "end_date": "2021-01-01", "s": "hello"}, + {"start_date": "2021-01-01", "end_date": "2021-01-01", "s": "world"}, + {"start_date": "2021-01-02", "end_date": "2021-01-02", "s": "hello"}, + {"start_date": "2021-02-01", "end_date": "2021-02-01", "s": "world"}, +] +``` + +### Substream slicer + +`SubstreamSlicer` iterates over the parent's stream slices. +This is useful for defining sub-resources. + +We might for instance want to read all the commits for a given repository (parent resource). + +For each stream, the slicer needs to know + +- what the parent stream is +- what is the key of the records in the parent stream +- what is the field defining the stream slice representing the parent record +- how to specify that information on an outgoing HTTP request + +Assuming the commits for a given repository can be read by specifying the repository as a request_parameter, this could be defined as + +```yaml +stream_slicer: + type: "SubstreamSlicer" + parent_streams_configs: + - stream: "*ref(repositories_stream)" + parent_key: "id" + stream_slice_field: "repository" + request_option: + field_name: "repository" + inject_into: "request_parameter" +``` + +REST APIs often nest sub-resources in the URL path. +If the URL to fetch commits was "/repositories/:id/commits", then the `Requester`'s path would need to refer to the stream slice's value and no `request_option` would be set: + +```yaml +retriever: + <...> + requester: + <...> + path: "/respositories/{{ stream_slice.repository }}/commits + stream_slicer: + type: "SubstreamSlicer" + parent_streams_configs: + - stream: "*ref(repositories_stream)" + parent_key: "id" + stream_slice_field: "repository" +``` + +[^1] This is a slight oversimplification. See update cursor section for more details on how the cursor is updated + +## More readings + +- [Incremental streams](../cdk-python/incremental-stream.md) +- [Stream slices](../cdk-python/stream-slices.md) \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/0-getting-started.md b/docs/connector-development/config-based/tutorial/0-getting-started.md new file mode 100644 index 000000000000..5170b6ad5d26 --- /dev/null +++ b/docs/connector-development/config-based/tutorial/0-getting-started.md @@ -0,0 +1,50 @@ +# Getting Started + +:warning: This framework is in alpha stage. Support is not in production and is available only to select users. :warning: + +## Summary + +Throughout this tutorial, we'll walk you through the creation an Airbyte source to read and extract data from an HTTP API. + +We'll build a connector reading data from the Exchange Rates API, but the steps will apply to other HTTP APIs you might be interested in integrating with. + +The API documentations can be found [here](https://exchangeratesapi.io/documentation/). +In this tutorial, we will read data from the following endpoints: + +- `Latest Rates Endpoint` +- `Historical Rates Endpoint` + +With the end goal of implementing a `Source` with a single `Stream` containing exchange rates going from a base currency to many other currencies. +The output schema of our stream will look like the following: + +```json +{ + "base": "USD", + "date": "2022-07-15", + "rates": { + "CAD": 1.28, + "EUR": 0.98 + } +} +``` + +## Exchange Rates API Setup + +Before we can get started, you'll need to generate an API access key for the Exchange Rates API. +This can be done by signing up for the Free tier plan on [Exchange Rates API](https://exchangeratesapi.io/): + +1. Visit https://exchangeratesapi.io and click "Get free API key" on the top right +2. You'll be taken to https://apilayer.com -- finish the sign up process, signing up for the free tier +3. Once you're signed in, visit https://apilayer.com/marketplace/exchangerates_data-api#documentation-tab and click "Live Demo" +4. Inside that editor, you'll see an API key. This is your API key. + +## Requirements + +- An Exchange Rates API key +- Python >= 3.9 +- Docker must be running +- NodeJS + +## Next Steps + +Next, we'll [create a Source using the connector generator.](1-create-source.md) \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/1-create-source.md b/docs/connector-development/config-based/tutorial/1-create-source.md new file mode 100644 index 000000000000..c2eebb5ace80 --- /dev/null +++ b/docs/connector-development/config-based/tutorial/1-create-source.md @@ -0,0 +1,32 @@ +# Step 1: Generate the source connector project locally + +Let's start by cloning the Airbyte repository: + +```bash +$ git clone git@github.com:airbytehq/airbyte.git +$ cd airbyte +``` + +Airbyte provides a code generator which bootstraps the scaffolding for our connector. + +```bash +$ cd airbyte-integrations/connector-templates/generator +$ ./generate.sh +``` + +This will bring up an interactive helper application. Use the arrow keys to pick a template from the list. Select the `Configuration Based Source` template and then input the name of your connector. The application will create a new directory in `airbyte/airbyte-integrations/connectors/` with the name of your new connector. + +``` +Configuration Based Source +Source name: exchange-rates-tutorial +``` + +For this walkthrough, we'll refer to our source as `exchange-rates-tutorial`. + +## Next steps + +Next, [we'll install dependencies required to run the connector](2-install-dependencies.md) + +## More readings + +- [Connector generator](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/README.md) \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/2-install-dependencies.md b/docs/connector-development/config-based/tutorial/2-install-dependencies.md new file mode 100644 index 000000000000..b165557c62d2 --- /dev/null +++ b/docs/connector-development/config-based/tutorial/2-install-dependencies.md @@ -0,0 +1,46 @@ +# Step 2: Install dependencies + +Let's create a python virtual environment for our source. +You can do this by executing the following commands from the root of the Airbyte repository. + +The command below assume that `python` points to a version of python >=3.9.0. On some systems, `python` points to a Python2 installation and `python3` points to Python3. +If this is the case on your machine, substitute the `python` commands with `python3`. +The subsequent `python` invocations will use the virtual environment created for the connector. + +```bash +$ cd ../../connectors/source-exchange-rates-tutorial +$ python -m venv .venv +$ source .venv/bin/activate +$ pip install -r requirements.txt +``` + +These steps create an initial python environment, and install the dependencies required to run an API Source connector. + +Let's verify everything works as expected by running the Airbyte `spec` operation: + +```bash +$ python main.py spec +``` + +You should see an output similar to the one below: + +``` +{"type": "SPEC", "spec": {"documentationUrl": "https://docsurl.com", "connectionSpecification": {"$schema": "http://json-schema.org/draft-07/schema#", "title": "Python Http Tutorial Spec", "type": "object", "required": ["TODO"], "additionalProperties": false, "properties": {"TODO: This schema defines the configuration required for the source. This usually involves metadata such as database and/or authentication information.": {"type": "string", "description": "describe me"}}}}} +``` + +This is a simple sanity check to make sure everything is wired up correctly. +More details on the `spec` operation can be found in [Basic Concepts](https://docs.airbyte.com/connector-development/cdk-python/basic-concepts) and [Defining Stream Schemas](https://docs.airbyte.com/connector-development/cdk-python/schemas). + +For now, note that the `main.py` file is a convenience wrapper to help run the connector. +Its invocation format is `python main.py [args]`. +The module's generated `README.md` contains more details on the supported commands. + +## Next steps + +Next, we'll [connect to the API source](3-connecting-to-the-API-source.md) + +## More readings + +- [Basic Concepts](https://docs.airbyte.com/connector-development/cdk-python/basic-concepts) +- [Defining Stream Schemas](https://docs.airbyte.com/connector-development/cdk-python/schemas) +- The module's generated `README.md` contains more details on the supported commands. \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/3-connecting-to-the-API-source.md b/docs/connector-development/config-based/tutorial/3-connecting-to-the-API-source.md new file mode 100644 index 000000000000..ca6abec250be --- /dev/null +++ b/docs/connector-development/config-based/tutorial/3-connecting-to-the-API-source.md @@ -0,0 +1,222 @@ +# Step 3: Connecting to the API + +We're now ready to start implementing the connector. + +Over the course of this tutorial, we'll be editing a few files that were generated by the code generator: + +- `source-exchange-rates-tutorial/source_exchange_rates_tutorial/spec.yaml`: This is the [spec file](../../connector-specification-reference.md). It describes the inputs used to configure the connector. +- `source-exchange-rates-tutorial/source_exchange_rates_tutorial/exchange_rates_tutorial.yaml`: This is the connector definition. It describes how the data should be read from the API source. +- `source-exchange_rates-tutorial/integration_tests/configured_catalog.json`: This is the connector's [catalog](../../../understanding-airbyte/beginners-guide-to-catalog.md). It describes what data is available in a source +- `source-exchange-rates-tutorial/integration_tests/sample_state.json`: This is a sample state object to be used to test [incremental syncs](../../cdk-python/incremental-stream.md). + +We'll also be creating the following files: + +- `source-exchange-rates-tutorial/secrets/config.json`: This is the configuration file we'll be using to test the connector. It's schema should match the schema defined in the spec file. +- `source-exchange-rates-tutorial/secrets/invalid_config.json`: This is an invalid configuration file we'll be using to test the connector. It's schema should match the schema defined in the spec file. +- `source_exchange_rates_tutorial/schemas/rates.json`: This is the [schema definition](../../cdk-python/schemas.md) for the stream we'll implement. + +## Updating the connector spec and config + +Let's populate the specification (`spec.yaml`) the configuration (`secrets/config.json), so the connector can access the access key and base currency. + +1. We'll add these properties to the connector spec in `source-exchange-rates-tutorial/source_exchange_rates_tutorial/spec.yaml` + +```yaml +documentationUrl: https://docs.airbyte.io/integrations/sources/exchangeratesapi +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: exchangeratesapi.io Source Spec + type: object + required: + - access_key + - base + additionalProperties: true + properties: + access_key: + type: string + description: >- + Your API Access Key. See here. The key is + case sensitive. + airbyte_secret: true + base: + type: string + description: >- + ISO reference currency. See here. + examples: + - EUR + - USD +``` + +2. We also need to fill in the connection config in the `secrets/config.json` + Because of the sensitive nature of the access key, we recommend storing this config in the `secrets` directory because it is ignored by git. + +```bash +$ echo '{"access_key": "", "base": "USD"}' > secrets/config.json +``` + +## Updating the connector definition + +Next, we'll update the connector definition (`source-exchange-rates-tutorial/source_exchange_rates_tutorial/exchange_rates_tutorial.yaml`). It was generated by the code generation script. +More details on the connector definition file can be found in the [overview](../overview.md) and [connection definition](../yaml-structure.md) sections. + +Let's fill this out these TODOs with the information found in the [Exchange Rates API docs](https://exchangeratesapi.io/documentation/) + +1. First, let's rename the stream from `customers` to `rates`, and update the primary key to `date` + +```yaml +streams: + - type: DeclarativeStream + $options: + name: "rates" + primary_key: "date" +``` + +and update the references in the `check` block + +```yaml +check: + type: CheckStream + stream_names: [ "rates" ] +``` + +Adding the reference in the `check` tells the `check` operation to use that stream to test the connection. + +2. Next we'll set the base url. + According to the API documentation, the base url is `"https://api.exchangeratesapi.io/v1/"`. + +```yaml +definitions: + <...> + retriever: + type: SimpleRetriever + $options: + url_base: "https://api.apilayer.com" +``` + +3. We can fetch the latest data by submitting a request to the `/latest` API endpoint. This path is specific to the stream, so we'll set it within the `rates_stream` definition, at the `retriever` level. + +```yaml +streams: + - type: DeclarativeStream + $options: + name: "rates" + primary_key: "date" + schema_loader: + $ref: "*ref(definitions.schema_loader)" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: "/exchangerates_data/latest" +``` + +4. Next, we'll set up the authentication. + The Exchange Rates API requires an access key to be passed as header named "apikey". + This can be done using an `ApiKeyAuthenticator`, which we'll configure to point to the config's `access_key` field. + +```yaml +definitions: + <...> + requester: + type: HttpRequester + name: "{{ options['name'] }}" + http_method: "GET" + authenticator: + type: ApiKeyAuthenticator + header: "apikey" + api_token: "{{ config['access_key'] }}" +``` + +5. According to the ExchangeRatesApi documentation, we can specify the base currency of interest in a request parameter. Let's assume the user will configure this via the connector configuration in parameter called `base`; we'll pass the value input by the user as a request parameter: + +```yaml +definitions: + <...> + requester: + <...> + request_options_provider: + request_parameters: + base: "{{ config['base'] }}" +``` + +The full connection definition should now look like + +```yaml +version: "0.1.0" + +definitions: + schema_loader: + type: JsonSchema + file_path: "./source_exchange_rates_tutorial/schemas/{{ options['name'] }}.json" + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_pointer: [ ] + requester: + type: HttpRequester + name: "{{ options['name'] }}" + http_method: "GET" + authenticator: + type: ApiKeyAuthenticator + header: "apikey" + api_token: "{{ config['access_key'] }}" + request_options_provider: + request_parameters: + base: "{{ config['base'] }}" + retriever: + type: SimpleRetriever + $options: + url_base: "https://api.apilayer.com" + name: "{{ options['name'] }}" + primary_key: "{{ options['primary_key'] }}" + record_selector: + $ref: "*ref(definitions.selector)" + paginator: + type: NoPagination + +streams: + - type: DeclarativeStream + $options: + name: "rates" + primary_key: "date" + schema_loader: + $ref: "*ref(definitions.schema_loader)" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: "/exchangerates_data/latest" +check: + type: CheckStream + stream_names: [ "rates" ] +``` + +We can now run the `check` operation, which verifies the connector can connect to the API source. + +```bash +$ python main.py check --config secrets/config.json +``` + +which should now succeed with logs similar to: + +``` +{"type": "LOG", "log": {"level": "INFO", "message": "Check succeeded"}} +{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "SUCCEEDED"}} +``` + +## Next steps + +Next, we'll [extract the records from the response](4-reading-data.md) + +## More readings + +- [Connector definition YAML file](../yaml-structure.md) +- [Config-based connectors overview](../overview.md) +- [Authentication](../authentication.md) +- [Request options providers](../request-options.md) +- [Schema definition](../../cdk-python/schemas.md) +- [Connector specification reference](../../connector-specification-reference.md) +- [Beginner's guide to catalog](../../../understanding-airbyte/beginners-guide-to-catalog.md) \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/4-reading-data.md b/docs/connector-development/config-based/tutorial/4-reading-data.md new file mode 100644 index 000000000000..a59c7d997b1f --- /dev/null +++ b/docs/connector-development/config-based/tutorial/4-reading-data.md @@ -0,0 +1,70 @@ +# Step 4: Reading data + +Now that we're able to authenticate to the source API, we'll want to select data from the HTTP responses. +Let's first add the stream to the configured catalog in `source-exchange_rates-tutorial/integration_tests/configured_catalog.json` + +```json +{ + "streams": [ + { + "stream": { + "name": "rates", + "json_schema": {}, + "supported_sync_modes": [ + "full_refresh" + ] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} +``` + +The configured catalog declares the sync modes supported by the stream (full refresh or incremental). +See the [catalog guide](https://docs.airbyte.io/understanding-airbyte/beginners-guide-to-catalog) for more information. + +Let's define the stream schema in `source-exchange-rates-tutorial/source_exchange_rates_tutorial/schemas/rates.json` + +You can download the JSON file describing the output schema with all currencies [here](https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json) for convenience and place it in `schemas/`. + +```bash +$ curl https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-cdk/python/docs/tutorials/http_api_source_assets/exchange_rates.json > source_exchange_rates_tutorial/schemas/rates.json +``` + +We can also delete the boilerplate schema files + +```bash +$ rm source_exchange_rates_tutorial/schemas/customers.json +$ rm source_exchange_rates_tutorial/schemas/employees.json +``` + +Reading from the source can be done by running the `read` operation + +```bash +$ python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +The logs should show that 1 record was read from the stream. + +``` +{"type": "LOG", "log": {"level": "INFO", "message": "Read 1 records from rates stream"}} +{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing rates"}} +``` + +The `--debug` flag can be set to print out debug information, including the outgoing request and its associated response + +```bash +$ python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --debug +``` + +## Next steps + +We now have a working implementation of a connector reading the latest exchange rates for a given currency. +We're however limited to only reading the latest exchange rate value. +Next, we'll [enhance the connector to read data for a given date, which will enable us to backfill the stream with historical data](5-incremental-reads.md). + +## More readings + +- [Record selector](../record-selector.md) +- [Catalog guide](https://docs.airbyte.io/understanding-airbyte/beginners-guide-to-catalog) \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/5-incremental-reads.md b/docs/connector-development/config-based/tutorial/5-incremental-reads.md new file mode 100644 index 000000000000..70164c4dab94 --- /dev/null +++ b/docs/connector-development/config-based/tutorial/5-incremental-reads.md @@ -0,0 +1,301 @@ +# Step 5: Incremental Reads + +We now have a working implementation of a connector reading the latest exchange rates for a given currency. +In this section, we'll update the source to read historical data instead of only reading the latest exchange rates. + +According to the API documentation, we can read the exchange rate for a specific date by querying the `"/exchangerates_data/{date}"` endpoint instead of `"/exchangerates_data/latest"`. + +We'll now add a `start_date` property to the connector. + +First we'll update the spec `source_exchange_rates_tutorial/spec.yaml` + +```yaml +documentationUrl: https://docs.airbyte.io/integrations/sources/exchangeratesapi +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: exchangeratesapi.io Source Spec + type: object + required: + - start_date + - access_key + - base + additionalProperties: true + properties: + start_date: + type: string + description: Start getting data from that date. + pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$ + examples: + - YYYY-MM-DD + access_key: + type: string + description: >- + Your API Access Key. See here. The key is + case sensitive. + airbyte_secret: true + base: + type: string + description: >- + ISO reference currency. See here. + examples: + - EUR + - USD +``` + +Then we'll set the `start_date` to last week in our connection config in `secrets/config.json`. +Let's add a start_date field to `secrets/config.json`. +The file should look like + +```json +{ + "access_key": "", + "start_date": "2022-07-26", + "base": "USD" +} +``` + +where the start date should be 7 days in the past. + +And we'll update the `path` in the connector definition to point to `/{{ config.start_date }}`. +Note that we are setting a default value because the `check` operation does not know the `start_date`. We'll default to hitting `/exchangerates_data/latest`: + +```yaml +streams: + - type: DeclarativeStream + $options: + name: "rates" + primary_key: "rates" + schema_loader: + $ref: "*ref(definitions.schema_loader)" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: "/exchangerates_data/{{config['start_date'] or 'latest'}}" +``` + +You can test these changes by executing the `read` operation: + +```bash +$ python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +By reading the output record, you should see that we read historical data instead of the latest exchange rate. +For example: +> "historical": true, "base": "USD", "date": "2022-07-18" + +The connector will now always read data for the start date, which is not exactly what we want. +Instead, we would like to iterate over all the dates between the start_date and today and read data for each day. + +We can do this by adding a `DatetimeStreamSlicer` to the connector definition, and update the `path` to point to the stream_slice's `start_date`: +More details on the stream slicers can be found [here](./link-to-stream-slicers.md) + +Let's first define a stream slicer at the top level of the connector definition: + +```yaml +definitions: + requester: + <...> + stream_slicer: + type: "DatetimeStreamSlicer" + start_datetime: + datetime: "{{ config['start_date'] }}" + datetime_format: "%Y-%m-%d" + end_datetime: + datetime: "{{ now_local() }}" + datetime_format: "%Y-%m-%d %H:%M:%S.%f" + step: "1d" + datetime_format: "%Y-%m-%d" + cursor_field: "{{ options['stream_cursor_field'] }}" + retriever: + <...> +``` + +and refer to it in the stream's retriever. +This will generate slices from the start time until the end time, where each slice is exactly one day. +The start time is defined in the config file, while the end time is defined by the `now_local()` macro, which will evaluate to the current date in the current timezone at runtime. See the section on [string interpolation](../yaml-structure.md#string-interpolation) for more details. + +Note that we're also setting the `stream_cursor_field` in the stream's `$options` so it can be accessed by the `StreamSlicer`: + +```yaml +streams: + - type: DeclarativeStream + $options: + name: "rates" + stream_cursor_field: "date" + primary_key: "rates" + <...> +``` + +We'll also update the retriever to user the stream slicer: + +```yaml +definitions: + <...> + retriever: + type: SimpleRetriever + <...> + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" +``` + +Finally, we'll update the path to point to the `stream_slice`'s start_time + +```yaml +streams: + - type: DeclarativeStream + <...> + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}" +``` + +The full connector definition should now look like `./source_exchange_rates_tutorial/exchange_rates_tutorial.yaml`: + +```yaml +version: "0.1.0" + +definitions: + schema_loader: + type: JsonSchema + file_path: "./source_exchange_rates_tutorial/schemas/{{ options['name'] }}.json" + selector: + type: RecordSelector + extractor: + type: DpathExtractor + transform: [ ] + requester: + type: HttpRequester + name: "{{ options['name'] }}" + http_method: "GET" + authenticator: + type: ApiKeyAuthenticator + header: "apikey" + api_token: "{{ config['access_key'] }}" + request_options_provider: + request_parameters: + base: "{{ config['base'] }}" + stream_slicer: + type: "DatetimeStreamSlicer" + start_datetime: + datetime: "{{ config['start_date'] }}" + datetime_format: "%Y-%m-%d" + end_datetime: + datetime: "{{ now_local() }}" + datetime_format: "%Y-%m-%d %H:%M:%S.%f" + step: "1d" + datetime_format: "%Y-%m-%d" + cursor_field: "{{ options['stream_cursor_field'] }}" + retriever: + type: SimpleRetriever + $options: + url_base: "https://api.apilayer.com" + name: "{{ options['name'] }}" + primary_key: "{{ options['primary_key'] }}" + record_selector: + $ref: "*ref(definitions.selector)" + paginator: + type: NoPagination + stream_slicer: + $ref: "*ref(definitions.stream_slicer)" + +streams: + - type: DeclarativeStream + $options: + name: "rates" + stream_cursor_field: "date" + primary_key: "rates" + schema_loader: + $ref: "*ref(definitions.schema_loader)" + retriever: + $ref: "*ref(definitions.retriever)" + requester: + $ref: "*ref(definitions.requester)" + path: "/exchangerates_data/{{stream_slice['start_time'] or 'latest'}}" +check: + type: CheckStream + stream_names: [ "rates" ] + +``` + +Running the `read` operation will now read all data for all days between start_date and now: + +```bash +$ python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +The operation should now output more than one record: + +``` +{"type": "LOG", "log": {"level": "INFO", "message": "Read 8 records from rates stream"}} +``` + +## Supporting incremental syncs + +Instead of always reading data for all dates, we would like the connector to only read data for dates we haven't read yet. +This can be achieved by updating the catalog to run in incremental mode (`integration_tests/configured_catalog.json`): + +```json +{ + "streams": [ + { + "stream": { + "name": "rates", + "json_schema": {}, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + } + ] +} +``` + +In addition to records, the `read` operation now also outputs state messages: + +``` +{"type": "STATE", "state": {"data": {"rates": {"date": "2022-07-15"}}}} +``` + +Where the date ("2022-07-15") should be replaced by today's date. + +We can simulate incremental syncs by creating a state file containing the last state produced by the `read` operation. +`source-exchange-rates-tutorial/integration_tests/sample_state.json`: + +```json +{ + "rates": { + "date": "2022-07-15" + } +} +``` + +Running the `read` operation will now only read data for dates later than the given state: + +```bash +$ python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state integration_tests/sample_state.json +``` + +There shouldn't be any data read if the state is today's date: + +``` +{"type": "LOG", "log": {"level": "INFO", "message": "Setting state of rates stream to {'date': '2022-07-15'}"}} +{"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from rates stream"}} +``` + +## Next steps: + +Next, we'll run the [Source Acceptance Tests suite to ensure the connector invariants are respected](6-testing.md). + +## More readings + +- [Incremental reads](../../cdk-python/incremental-stream.md) +- [Stream slicers](../stream-slicers.md) +- [Stream slices](../cdk-python/stream-slices.md) \ No newline at end of file diff --git a/docs/connector-development/config-based/tutorial/6-testing.md b/docs/connector-development/config-based/tutorial/6-testing.md new file mode 100644 index 000000000000..90129993c5ac --- /dev/null +++ b/docs/connector-development/config-based/tutorial/6-testing.md @@ -0,0 +1,52 @@ +# Step 6: Testing + +We should make sure the connector respects the Airbyte specifications before we start using it in production. +This can be done by executing the Source Acceptance Tests (SAT). + +These tests will assert the most basic functionalities work as expected and are configured in `acceptance-test-config.yml`. + +Before running the tests, we'll create an invalid config to make sure the `check` operation fails if the credentials are wrong, and an abnormal state to verify the connector's behavior when running with an abnormal state. + +Update `integration_tests/invalid_config.json` with this content + +```json +{ + "access_key": "", + "start_date": "2022-07-21", + "base": "USD" +} +``` + +and `integration_tests/abnormal_state.json` with + +```json +{ + "rates": { + "date": "2999-12-31" + } +} + +``` + +You can run the acceptance tests with the following commands: + +```bash +$ docker build . -t airbyte/source-exchange-rates-tutorial:dev +$ python -m pytest integration_tests -p integration_tests.acceptance +``` + +## Next steps: + +Next, we'll add the connector to the [Airbyte platform](https://docs.airbyte.com/connector-development/tutorials/cdk-tutorial-python-http/use-connector-in-airbyte). + +See your [Contributiong guide]() on how to get started releasing your connector. + +## Read more: + +- [Error handling](../error-handling.md) +- [Pagination](../pagination.md) +- [Testing connectors](../../testing-connectors/README.md) +- [Contribution guide](../../../contributing-to-airbyte/README.md) +- [Greenhouse source](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-greenhouse) +- [Sendgrid source](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sendgrid) +- [Sentry source](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-sentry) \ No newline at end of file diff --git a/docs/connector-development/config-based/yaml-structure.md b/docs/connector-development/config-based/yaml-structure.md new file mode 100644 index 000000000000..d81da3a3c0d9 --- /dev/null +++ b/docs/connector-development/config-based/yaml-structure.md @@ -0,0 +1,273 @@ +# Connector definition + +Connectors are defined as a yaml configuration describing the connector's Source. + +3 top-level fields are required: + +1. `streams`: List of streams that are part of the source +2. `check`: Component describing how to check the connection. +3. `version`: The framework version. + +The configuration will be validated against this JSON Schema, which defines the set of valid properties. + +The general structure of the YAML is as follows: + +```yaml +version: "0.1.0" +definitions: + +streams: + +check: + +``` + +We recommend using the `Configuration Based Source` template from the template generator in `airbyte-integrations/connector-templates/generator` to generate the basic file structure. + +See the [tutorial for a complete connector definition](tutorial/6-testing.md) + +## Object instantiation + +This section describes the object that are to be instantiated from the YAML definition. + +If the component is a literal, then it is returned as is: + +``` +3 +``` + +will result in + +``` +3 +``` + +If the component is a mapping with a "class_name" field, +an object of type "class_name" will be instantiated by passing the mapping's other fields to the constructor + +```yaml +my_component: + class_name: "fully_qualified.class_name" + a_parameter: 3 + another_parameter: "hello" +``` + +will result in + +``` +fully_qualified.class_name(a_parameter=3, another_parameter="hello") +``` + +If the component definition is a mapping with a "type" field, +the factory will lookup the [CLASS_TYPES_REGISTRY](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py) and replace the "type" field by "class_name" -> CLASS_TYPES_REGISTRY[type] +and instantiate the object from the resulting mapping + +If the component definition is a mapping with neither a "class_name" nor a "type" field, +the factory will do a best-effort attempt at inferring the component type by looking up the parent object's constructor type hints. +If the type hint is an interface present in [DEFAULT_IMPLEMENTATIONS_REGISTRY](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py, +then the factory will create an object of its default implementation. + +If the component definition is a list, then the factory will iterate over the elements of the list, +instantiate its subcomponents, and return a list of instantiated objects. + +If the component has subcomponents, the factory will create the subcomponents before instantiating the top level object + +``` +{ + "type": TopLevel + "param": + { + "type": "ParamType" + "k": "v" + } +} +``` + +will result in + +``` +TopLevel(param=ParamType(k="v")) +``` + +More details on object instantiation can be found [here](https://airbyte-cdk.readthedocs.io/en/latest/api/airbyte_cdk.sources.declarative.parsers.html?highlight=factory#airbyte_cdk.sources.declarative.parsers.factory.DeclarativeComponentFactory). + +### $options + +Parameters can be passed down from a parent component to its subcomponents using the $options key. +This can be used to avoid repetitions. + +```yaml +outer: + $options: + MyKey: MyValue + inner: + k2: v2 +``` + +This the example above, if both outer and inner are types with a "MyKey" field, both of them will evaluate to "MyValue". + +These parameters can be overwritten by subcomponents as a form of specialization: + +```yaml +outer: + $options: + MyKey: MyValue + inner: + $options: + MyKey: YourValue + k2: v2 +``` + +In this example, "outer.MyKey" will evaluate to "MyValue", and "inner.MyKey" will evaluate to "YourValue". + +The value can also be used for string interpolation: + +```yaml +outer: + $options: + MyKey: MyValue + inner: + k2: "MyKey is {{ options['MyKey'] }}" +``` + +In this example, outer.inner.k2 will evaluate to "MyKey is MyValue" + +## References + +Strings can contain references to previously defined values. +The parser will dereference these values to produce a complete ConnectionDefinition + +References can be defined using a *ref() string. + +```yaml +key: 1234 +reference: "*ref(key)" +``` + +will produce the following definition: + +```yaml +key: 1234 +reference: 1234 +``` + +This also works with objects: + +```yaml +key_value_pairs: + k1: v1 + k2: v2 +same_key_value_pairs: "*ref(key_value_pairs)" +``` + +will produce the following definition: + +```yaml +key_value_pairs: + k1: v1 + k2: v2 +same_key_value_pairs: + k1: v1 + k2: v2 +``` + +The $ref keyword can be used to refer to an object and enhance it with addition key-value pairs + +```yaml +key_value_pairs: + k1: v1 + k2: v2 +same_key_value_pairs: + $ref: "*ref(key_value_pairs)" + k3: v3 +``` + +will produce the following definition: + +```yaml +key_value_pairs: + k1: v1 + k2: v2 +same_key_value_pairs: + k1: v1 + k2: v2 + k3: v3 +``` + +References can also point to nested values. +Nested references are ambiguous because one could define a key containing with `.` +in this example, we want to refer to the limit key in the dict object: + +```yaml +dict: + limit: 50 +limit_ref: "*ref(dict.limit)" +``` + +will produce the following definition: + +```yaml +dict +limit: 50 +limit-ref: 50 +``` + +whereas here we want to access the `nested.path` value. + +```yaml +nested: + path: "first one" +nested.path: "uh oh" +value: "ref(nested.path) +``` + +will produce the following definition: + +```yaml +nested: + path: "first one" +nested.path: "uh oh" +value: "uh oh" +``` + +To resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward +until we find a key with the given path, or until there is nothing to traverse. + +More details on referencing values can be found [here](https://airbyte-cdk.readthedocs.io/en/latest/api/airbyte_cdk.sources.declarative.parsers.html?highlight=yamlparser#airbyte_cdk.sources.declarative.parsers.yaml_parser.YamlParser). + +## String interpolation + +String values can be evaluated as Jinja2 templates. + +If the input string is a raw string, the interpolated string will be the same. +`"hello world" -> "hello world"` + +The engine will evaluate the content passed within `{{...}}`, interpolating the keys from context-specific arguments. +the "options" keyword [see ($options)](yaml-structure.md#object-instantiation) can be referenced. + +For example, some_object.inner_object.key will evaluate to "Hello airbyte" at runtime. + +```yaml +some_object: + $options: + name: "airbyte" + inner_object: + key: "Hello {{ options.name }}" +``` + +Some components also pass in additional arguments to the context. +This is the case for the [record selector](record-selector.md), which passes in an additional `response` argument. + +Both dot notation and bracket notations (with single quotes ( `'`)) are interchangeable. +This means that both these string templates will evaluate to the same string: + +1. `"{{ options.name }}"` +2. `"{{ options['name'] }}"` + +In additional to passing additional values through the kwargs argument, macros can be called from within the string interpolation. +For example, +`"{{ max(2, 3) }}" -> 3` + +The macros available can be found [here](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/declarative/interpolation/macros.py). + +Additional information on jinja templating can be found at https://jinja.palletsprojects.com/en/3.1.x/templates/# \ No newline at end of file diff --git a/docusaurus/docusaurus.config.js b/docusaurus/docusaurus.config.js index 7b0a247057e1..38619eb9d7aa 100644 --- a/docusaurus/docusaurus.config.js +++ b/docusaurus/docusaurus.config.js @@ -61,7 +61,8 @@ const config = { sidebarCollapsible: true, sidebarPath: require.resolve('./sidebars.js'), editUrl: 'https://github.com/airbytehq/airbyte/blob/master/docs', - path: '../docs' + path: '../docs', + exclude: ['**/connector-development/config-based/**'] }, blog: false, theme: {