Skip to content

Commit

Permalink
CDK: AbstractSource.read() skips syncing stream if its unavailable …
Browse files Browse the repository at this point in the history
…(add `AvailabilityStrategy` concept) (#19977)

* Rough first implememtation of AvailabilityStrategy s

* Basic unit tests for AvailabilityStrategy and ScopedAvailabilityStrategy

* Make availability_strategy a property, separate out tests

* Remove from DeclarativeSource, remove Source parameter from methods, make default no AvailabilityStrategy

* Add skip stream if not available to read()

* Changes to CDK to get source-github working using AvailabilityStrategy, flakecheck

* reorganize cdk class, add HTTPAvailabilityStrategy test

* cleanup, docstrings

* pull out error handling into separate method

* Pass source and logger to check_connection method

* Add documentation links, handle 403 specifically

* Fix circular import

* Add AvailabilityStrategy to Stream and HTTPStream classes

* Remove AS from abstract_source, add to Stream, HTTPStream, AvailabilityStrategy unit tests passing for per-stream strategies

* Modify MockHttpStream to set no AvailabilityStrategy since source test mocking doesn't support this

* Move AvailabilityStrategy class to sources.streams

* Move HTTPAvailabilityStrategy to http module

* Use pascal case for HttpAvailabilityStrategy

* Remove docs message method :( and default to True availability on unhandled HTTPErrors

* add check_availability method to stream class

* Add optional source parameter

* Add test for connector-specific documentation, small tests refactor

* Add test that performs the read() function for stream with default availability strategy

* Add test for read function behavior when stream is unavailable

* Add 403 info in logger message

* Don't return error for other HTTPErrors

* Split up error handling into methods 'unavailable_error_codes' and 'get_reason_for_error'

* rework overrideable list of status codes to be a dict with reasons, to enforce that users provide reasons for all listed errors

* Fix incorrect typing

* Move HttpAvailability to its own module, fix flake errors

* Fix ScopedAvailabilityStrategy, docstrings and types for streams/availability_strategy.py

* Docstrings and types for core.py and http/availability_strategy.py

* Move _get_stream_slices to a StreamHelper class

* Docstrings + types for stream_helpers.py, cleanup test_availability.py

* Clean up test_source.py

* Move logic of getting the initial record from a stream to StreamHelper class

* Add changelog and bump minor version

* change 'is True' and 'is False' behavior

* use mocker.MagicMock

* Remove ScopedAvailabilityStrategy

* Don't except non-403 errors, check_stream uses availability_strategy if possible

* CDK: pass error to reasons_for_error_codes

* make get_stream_slice public

* Add tests for raising unhandled errors and retries are handled

* Add tests for CheckStream via AvailabilityStrategy

* Add documentation for stream availability of http streams

* Move availability unit tests to correct modules, report error message if possible

* Add test for reporting specific error if available
  • Loading branch information
erohmensing authored Dec 12, 2022
1 parent 3c8bb42 commit 55a3288
Show file tree
Hide file tree
Showing 16 changed files with 681 additions and 50 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.13.0
Add `Stream.check_availability` and `Stream.AvailabilityStrategy`. Make `HttpAvailabilityStrategy` the default `HttpStream.AvailabilityStrategy`.

## 0.12.4
Lookback window should applied when a state is supplied as well

Expand Down
6 changes: 5 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ def read(
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)
stream_is_available, error = stream_instance.check_availability(logger, self)
if not stream_is_available:
logger.warning(f"Skipped syncing stream '{stream_instance.name}' because it was unavailable. Error: {error}")
continue
try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
Expand Down Expand Up @@ -187,7 +191,7 @@ def _read_stream(
@staticmethod
def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool:
"""
Check if record count reached liimt set by internal config.
Check if record count reached limit set by internal config.
:param internal_config - internal CDK configuration separated from user defined config
:records_counter - number of records already red
:return True if limit reached, False otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple

from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.utils.stream_helpers import StreamHelper
from dataclasses_jsonschema import JsonSchemaMixin


Expand All @@ -33,29 +33,19 @@ def check_connection(self, source: Source, logger: logging.Logger, config: Mappi
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
for stream_name in self.stream_names:
if stream_name in stream_name_to_stream.keys():
stream = stream_name_to_stream[stream_name]
try:
# Some streams need a stream slice to read records (eg if they have a SubstreamSlicer)
stream_slice = self._get_stream_slice(stream)
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
next(records)
except Exception as error:
return False, f"Unable to connect to stream {stream_name} - {error}"
else:
if stream_name not in stream_name_to_stream.keys():
raise ValueError(f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}")
return True, None

def _get_stream_slice(self, stream):
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
slices = iter(
stream.stream_slices(
cursor_field=stream.cursor_field,
sync_mode=SyncMode.full_refresh,
)
)
try:
return next(slices)
except StopIteration:
return {}
stream = stream_name_to_stream[stream_name]
try:
if stream.availability_strategy is not None:
stream_is_available, reason = stream.check_availability(logger, source)
if not stream_is_available:
return False, reason
else:
stream_helper = StreamHelper()
stream_helper.get_first_record(stream)
except Exception as error:
return False, f"Unable to connect to stream {stream_name} - {error}"

return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DeclarativeSource(AbstractSource):
@property
@abstractmethod
def connection_checker(self) -> ConnectionChecker:
"""Returns the ConnectioChecker to use for the `check` operation"""
"""Returns the ConnectionChecker to use for the `check` operation"""

def check_connection(self, logger, config) -> Tuple[bool, any]:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
import typing
from abc import ABC, abstractmethod
from typing import Optional, Tuple

from airbyte_cdk.sources.streams import Stream

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source


class AvailabilityStrategy(ABC):
"""
Abstract base class for checking stream availability.
"""

@abstractmethod
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
"""
Checks stream availability.
:param stream: stream
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
29 changes: 28 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

import inspect
import logging
import typing
from abc import ABC, abstractmethod
from functools import lru_cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import airbyte_cdk.sources.utils.casing as casing
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
Expand All @@ -17,6 +18,10 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated.classic import deprecated

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy

# A stream's read method can return one of the following types:
# Mapping[str, Any]: The content of an AirbyteRecordMessage
# AirbyteRecordMessage: An AirbyteRecordMessage
Expand Down Expand Up @@ -170,6 +175,28 @@ def source_defined_cursor(self) -> bool:
"""
return True

def check_availability(self, logger: logging.Logger, source: Optional["Source"] = None) -> Tuple[bool, Optional[str]]:
"""
Checks whether this stream is available.
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then this stream
is available, and no str is required. Otherwise, this stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
if self.availability_strategy:
return self.availability_strategy.check_availability(self, logger, source)
return True, None

@property
def availability_strategy(self) -> Optional["AvailabilityStrategy"]:
"""
:return: The AvailabilityStrategy used to check whether this stream is available.
"""
return None

@property
@abstractmethod
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
import typing
from typing import Dict, Optional, Tuple

import requests
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.utils.stream_helpers import StreamHelper
from requests import HTTPError

if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source


class HttpAvailabilityStrategy(AvailabilityStrategy):
def check_availability(self, stream: Stream, logger: logging.Logger, source: Optional["Source"]) -> Tuple[bool, Optional[str]]:
"""
Check stream availability by attempting to read the first record of the
stream.
:param stream: stream
:param logger: source logger
:param source: (optional) source
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
try:
stream_helper = StreamHelper()
stream_helper.get_first_record(stream)
except HTTPError as error:
return self.handle_http_error(stream, logger, source, error)
return True, None

def handle_http_error(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Tuple[bool, Optional[str]]:
"""
Override this method to define error handling for various `HTTPError`s
that are raised while attempting to check a stream's availability.
Checks whether an error's status_code is in a list of unavailable_error_codes,
and gets the associated reason for that error.
:param stream: stream
:param logger: source logger
:param source: optional (source)
:param error: HTTPError raised while checking stream's availability.
:return: A tuple of (boolean, str). If boolean is true, then the stream
is available, and no str is required. Otherwise, the stream is unavailable
for some reason and the str should describe what went wrong and how to
resolve the unavailability, if possible.
"""
try:
status_code = error.response.status_code
reason = self.reasons_for_unavailable_status_codes(stream, logger, source, error)[status_code]
response_error_message = stream.parse_response_error_message(error.response)
if response_error_message:
reason += response_error_message
return False, reason
except KeyError:
# If the HTTPError is not in the dictionary of errors we know how to handle, don't except it
raise error

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
"""
Returns a dictionary of HTTP status codes that indicate stream
unavailability and reasons explaining why a given status code may
have occurred and how the user can resolve that error, if applicable.
:param stream: stream
:param logger: source logger
:param source: optional (source)
:return: A dictionary of (status code, reason) where the 'reason' explains
why 'status code' may have occurred and how the user can resolve that
error, if applicable.
"""
forbidden_error_message = f"The endpoint to access stream '{stream.name}' returned 403: Forbidden. "
forbidden_error_message += "This is most likely due to insufficient permissions on the credentials in use. "
forbidden_error_message += self._visit_docs_message(logger, source)

reasons_for_codes: Dict[int, str] = {requests.codes.FORBIDDEN: forbidden_error_message}
return reasons_for_codes

@staticmethod
def _visit_docs_message(logger: logging.Logger, source: Optional["Source"]) -> str:
"""
Creates a message indicicating where to look in the documentation for
more information on a given source by checking the spec of that source
(if provided) for a 'documentationUrl'.
:param logger: source logger
:param source: optional (source)
:return: A message telling the user where to go to learn more about the source.
"""
if not source:
return "Please visit the connector's documentation to learn more. "

try:
connector_spec = source.spec(logger)
docs_url = connector_spec.documentationUrl
if docs_url:
return f"Please visit {docs_url} to learn more. "
else:
return "Please visit the connector's documentation to learn more. "

except FileNotFoundError: # If we are unit testing without implementing spec() method in source
if source:
docs_url = f"https://docs.airbyte.com/integrations/sources/{source.name}"
else:
docs_url = "https://docs.airbyte.com/integrations/sources/test"

return f"Please visit {docs_url} to learn more."
6 changes: 6 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import requests
import requests_cache
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.core import Stream, StreamData
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from requests.auth import AuthBase
from requests_cache.session import CachedSession

Expand Down Expand Up @@ -113,6 +115,10 @@ def retry_factor(self) -> float:
def authenticator(self) -> HttpAuthenticator:
return self._authenticator

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return HttpAvailabilityStrategy()

@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Expand Down
44 changes: 44 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/stream_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import StreamData


class StreamHelper:
def get_first_record(self, stream: Stream) -> StreamData:
"""
Gets the first record for a stream.
:param stream: stream
:return: StreamData containing the first record in the stream
"""
# Some streams need a stream slice to read records (e.g. if they have a SubstreamSlicer)
stream_slice = self.get_stream_slice(stream)
records = stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice)
next(records)

@staticmethod
def get_stream_slice(stream: Stream) -> Optional[Mapping[str, Any]]:
"""
Gets the first stream_slice from a given stream's stream_slices.
:param stream: stream
:return: First stream slice from 'stream_slices' generator
"""
# We wrap the return output of stream_slices() because some implementations return types that are iterable,
# but not iterators such as lists or tuples
slices = iter(
stream.stream_slices(
cursor_field=stream.cursor_field,
sync_mode=SyncMode.full_refresh,
)
)
try:
return next(slices)
except StopIteration:
return {}
11 changes: 11 additions & 0 deletions airbyte-cdk/python/docs/concepts/http-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,14 @@ When we are dealing with streams that depend on the results of another stream, w
If you need to set any network-adapter keyword args on the outgoing HTTP requests such as `allow_redirects`, `stream`, `verify`, `cert`, etc..
override the `request_kwargs` method. Any option listed in [BaseAdapter.send](https://docs.python-requests.org/en/latest/api/#requests.adapters.BaseAdapter.send) can
be returned as a keyword argument.

## Stream Availability

The CDK defines an `AvailabilityStrategy` for a stream, which is used to perform the `check_availability` method. This method checks whether
the stream is available before performing `read_records`.

For HTTP streams, a default `HttpAvailabilityStrategy` is defined, which attempts to read the first record of the stream, and excepts
a dictionary of known error codes and associated reasons, `reasons_for_unavailable_status_codes`. By default, this list contains only
`requests.status_codes.FORBIDDEN` (403), with an associated error message that tells the user that they are likely missing permissions associated with that stream.

You can override these known errors to except more error codes and inform the user how to resolve errors.
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.12.4",
version="0.13.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Loading

0 comments on commit 55a3288

Please sign in to comment.