Skip to content

Commit

Permalink
source-github: move known error handling to GithubAvailabilityStrategy (
Browse files Browse the repository at this point in the history
#19978)

* Rough first implememtation of AvailabilityStrategy s

* Basic unit tests for AvailabilityStrategy and ScopedAvailabilityStrategy

* Make availability_strategy a property, separate out tests

* Remove from DeclarativeSource, remove Source parameter from methods, make default no AvailabilityStrategy

* Add skip stream if not available to read()

* Changes to CDK to get source-github working using AvailabilityStrategy, flakecheck

* reorganize cdk class, add HTTPAvailabilityStrategy test

* cleanup, docstrings

* source-github working with availabilitystrategy

* reorganize source-github

* source-github: resolve deprecation warning

* use correct import for HTTPAvailabilitystrategy

* pull out error handling into separate method

* use handle_http_error method

* Pass source and logger to check_connection method

* Add documentation links, handle 403 specifically

* Fix circular import

* Add AvailabilityStrategy to Stream and HTTPStream classes

* Remove AS from abstract_source, add to Stream, HTTPStream, AvailabilityStrategy unit tests passing for per-stream strategies

* Modify MockHttpStream to set no AvailabilityStrategy since source test mocking doesn't support this

* Move AvailabilityStrategy class to sources.streams

* Move HTTPAvailabilityStrategy to http module

* Use pascal case for HttpAvailabilityStrategy

* Remove docs message method :( and default to True availability on unhandled HTTPErrors

* add check_availability method to stream class

* Add optional source parameter

* Add test for connector-specific documentation, small tests refactor

* Add test that performs the read() function for stream with default availability strategy

* Add test for read function behavior when stream is unavailable

* Add 403 info in logger message

* Don't return error for other HTTPErrors

* Split up error handling into methods 'unavailable_error_codes' and 'get_reason_for_error'

* rework overrideable list of status codes to be a dict with reasons, to enforce that users provide reasons for all listed errors

* Fix incorrect typing

* Move HttpAvailability to its own module, fix flake errors

* Fix ScopedAvailabilityStrategy, docstrings and types for streams/availability_strategy.py

* Docstrings and types for core.py and http/availability_strategy.py

* Move _get_stream_slices to a StreamHelper class

* Docstrings + types for stream_helpers.py, cleanup test_availability.py

* Clean up test_source.py

* Move logic of getting the initial record from a stream to StreamHelper class

* Add changelog and bump minor version

* change 'is True' and 'is False' behavior

* use mocker.MagicMock

* Remove ScopedAvailabilityStrategy

* Don't except non-403 errors, check_stream uses availability_strategy if possible

* Move AvailabilityStrategy to stream level, fix tests

* make get_stream_slice public

* Attempt to refactor error code handling into repository-based and organization-based

* split into repository-based availabilitystrategy and organization-based availabilitystrategy

* refactor organization-based availabilitystrategy

* refactor repository-based availabilitystrategy, create separate ones for workflow_runs and projects

* Fix workflow runs availability strategy

* move availability strategies to a different module

* CDK: pass error to reasons_for_error_codes

* make get_stream_slice public

* Revert "make get_stream_slice public"

This reverts commit 9170fe5.

* Add tests for raising unhandled errors and retries are handled

* Add tests for CheckStream via AvailabilityStrategy

* Remove moved file

* bump CDK dependency

* Cleanup: Address review comments

* One more fix

* Update changelog and dockerfile version
  • Loading branch information
erohmensing authored Dec 14, 2022
1 parent 82df676 commit f97db17
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@
- name: GitHub
sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerRepository: airbyte/source-github
dockerImageTag: 0.3.8
dockerImageTag: 0.3.9
documentationUrl: https://docs.airbyte.com/integrations/sources/github
icon: github.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4548,7 +4548,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-github:0.3.8"
- dockerImage: "airbyte/source-github:0.3.9"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/github"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.3.8
LABEL io.airbyte.version=0.3.9
LABEL io.airbyte.name=airbyte/source-github
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "pendulum~=2.1.2", "sgqlc"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.13", "pendulum~=2.1.2", "sgqlc"]

TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "responses~=0.19.0"]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Dict, Optional

import requests
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy
from airbyte_cdk.sources.utils.stream_helpers import StreamHelper
from requests import HTTPError


class OrganizationBasedAvailabilityStrategy(HttpAvailabilityStrategy):
"""
Availability Strategy for organization-based streams.
"""

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
stream_slice = StreamHelper().get_stream_slice(stream)
organization = stream_slice["organization"]
response_error_msg = str(error.response.json().get("message"))

reasons_for_codes = {
requests.codes.NOT_FOUND: f"`{stream.__class__.__name__}` stream isn't available for organization `{organization}`.",
# When `403` for the stream, that has no access to the organization's teams, based on OAuth Apps Restrictions:
# https://docs.github.com/en/organizations/restricting-access-to-your-organizations-data/enabling-oauth-app-access-restrictions-for-your-organization
requests.codes.FORBIDDEN: f"`{stream.name}` stream isn't available for organization `{organization}`. Full error message: {response_error_msg}",
}
return reasons_for_codes


class RepositoryBasedAvailabilityStrategy(HttpAvailabilityStrategy):
"""
Availability Strategy for repository-based streams.
"""

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
stream_slice = StreamHelper().get_stream_slice(stream)
repository = stream_slice["repository"]
error_msg = str(error.response.json().get("message"))

reasons_for_codes = {
requests.codes.NOT_FOUND: f"`{stream.name}` stream isn't available for repository `{repository}`.",
requests.codes.FORBIDDEN: f"`{stream.name}` stream isn't available for repository `{repository}`. Full error message: {error_msg}",
requests.codes.CONFLICT: f"`{stream.name}` stream isn't available for repository `{repository}`, it seems like this repository is empty.",
}
return reasons_for_codes


class WorkflowRunsAvailabilityStrategy(RepositoryBasedAvailabilityStrategy):
"""
AvailabilityStrategy for the 'WorkflowRuns' stream.
"""

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
stream_slice = StreamHelper().get_stream_slice(stream)
repository = stream_slice["repository"]
reasons_for_codes = super().reasons_for_unavailable_status_codes(stream, logger, source, error).copy()
server_error_msg = f"Syncing `{stream.name}` stream isn't available for repository `{repository}`."
reasons_for_codes[requests.codes.SERVER_ERROR] = server_error_msg
return reasons_for_codes


class ProjectsAvailabilityStrategy(RepositoryBasedAvailabilityStrategy):
"""
AvailabilityStrategy for the 'Projects' stream.
"""

def reasons_for_unavailable_status_codes(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"], error: HTTPError
) -> Dict[int, str]:
stream_slice = StreamHelper().get_stream_slice(stream)
repository = stream_slice["repository"]
reasons_for_codes = super().reasons_for_unavailable_status_codes(stream, logger, source, error).copy()

# Some repos don't have projects enabled and we we get "410 Client Error: Gone for
# url: https://api.github.com/repos/xyz/projects?per_page=100" error.
gone_error_msg = f"`Projects` stream isn't available for repository `{repository}`."
reasons_for_codes[requests.codes.GONE] = gone_error_msg

return reasons_for_codes
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth.token import MultipleTokenAuthenticator

from .streams import (
Assignees,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from requests.exceptions import HTTPError

from .availability_strategies import (
OrganizationBasedAvailabilityStrategy,
ProjectsAvailabilityStrategy,
RepositoryBasedAvailabilityStrategy,
WorkflowRunsAvailabilityStrategy,
)
from .graphql import CursorStorage, QueryReactions, get_query_pull_requests, get_query_reviews
from .utils import getter

Expand Down Expand Up @@ -124,60 +130,6 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]:
return f'Please try to decrease the "Page size for large streams" below {self.page_size}. The stream "{self.name}" is a large stream, such streams can fail with 502 for high "page_size" values.'
return super().get_error_display_message(exception)

def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
# get out the stream_slice parts for later use.
organisation = stream_slice.get("organization", "")
repository = stream_slice.get("repository", "")
# Reading records while handling the errors
try:
yield from super().read_records(stream_slice=stream_slice, **kwargs)
except HTTPError as e:
# This whole try/except situation in `read_records()` isn't good but right now in `self._send_request()`
# function we have `response.raise_for_status()` so we don't have much choice on how to handle errors.
# Bocked on https://github.com/airbytehq/airbyte/issues/3514.
if e.response.status_code == requests.codes.NOT_FOUND:
# A lot of streams are not available for repositories owned by a user instead of an organization.
if isinstance(self, Organizations):
error_msg = (
f"Syncing `{self.__class__.__name__}` stream isn't available for organization `{stream_slice['organization']}`."
)
else:
error_msg = f"Syncing `{self.__class__.__name__}` stream isn't available for repository `{stream_slice['repository']}`."
elif e.response.status_code == requests.codes.FORBIDDEN:
error_msg = str(e.response.json().get("message"))
# When using the `check_connection` method, we should raise an error if we do not have access to the repository.
if isinstance(self, Repositories):
raise e
# When `403` for the stream, that has no access to the organization's teams, based on OAuth Apps Restrictions:
# https://docs.github.com/en/organizations/restricting-access-to-your-organizations-data/enabling-oauth-app-access-restrictions-for-your-organization
# For all `Organisation` based streams
elif isinstance(self, Organizations) or isinstance(self, Teams) or isinstance(self, Users):
error_msg = (
f"Syncing `{self.name}` stream isn't available for organization `{organisation}`. Full error message: {error_msg}"
)
# For all other `Repository` base streams
else:
error_msg = (
f"Syncing `{self.name}` stream isn't available for repository `{repository}`. Full error message: {error_msg}"
)
elif e.response.status_code == requests.codes.GONE and isinstance(self, Projects):
# Some repos don't have projects enabled and we we get "410 Client Error: Gone for
# url: https://api.github.com/repos/xyz/projects?per_page=100" error.
error_msg = f"Syncing `Projects` stream isn't available for repository `{stream_slice['repository']}`."
elif e.response.status_code == requests.codes.CONFLICT:
error_msg = (
f"Syncing `{self.name}` stream isn't available for repository "
f"`{stream_slice['repository']}`, it seems like this repository is empty."
)
elif e.response.status_code == requests.codes.SERVER_ERROR and isinstance(self, WorkflowRuns):
error_msg = f"Syncing `{self.name}` stream isn't available for repository `{stream_slice['repository']}`."
else:
# most probably here we're facing a 500 server error and a risk to get a non-json response, so lets output response.text
self.logger.error(f"Undefined error while reading records: {e.response.text}")
raise e

self.logger.warn(error_msg)

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
Expand Down Expand Up @@ -211,6 +163,10 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
record["repository"] = stream_slice["repository"]
return record

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return RepositoryBasedAvailabilityStrategy()


class SemiIncrementalMixin:
"""
Expand Down Expand Up @@ -384,6 +340,10 @@ def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str,
record["organization"] = stream_slice["organization"]
return record

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return OrganizationBasedAvailabilityStrategy()


class Repositories(SemiIncrementalMixin, Organizations):
"""
Expand Down Expand Up @@ -592,6 +552,10 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]:

return {**base_headers, **headers}

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return ProjectsAvailabilityStrategy()


class IssueEvents(SemiIncrementalMixin, GithubStream):
"""
Expand Down Expand Up @@ -1357,6 +1321,10 @@ def read_records(
if created_at < break_point:
break

@property
def availability_strategy(self) -> Optional[AvailabilityStrategy]:
return WorkflowRunsAvailabilityStrategy()


class WorkflowJobs(SemiIncrementalMixin, GithubStream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
import logging
from http import HTTPStatus
from pathlib import Path
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -51,6 +52,8 @@

DEFAULT_BACKOFF_DELAYS = [5, 10, 20, 40, 80]

logger = logging.getLogger("source-github")


@responses.activate
@patch("time.sleep")
Expand Down Expand Up @@ -184,7 +187,9 @@ def test_stream_teams_404():
json={"message": "Not Found", "documentation_url": "https://docs.github.com/rest/reference/teams#list-teams"},
)

assert list(read_full_refresh(stream)) == []
stream_is_available, reason = stream.check_availability(logger)
assert not stream_is_available
assert "`Teams` stream isn't available for organization `org_name`." in reason
assert len(responses.calls) == 1
assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/teams?per_page=100"

Expand Down Expand Up @@ -237,7 +242,9 @@ def test_stream_repositories_404():
json={"message": "Not Found", "documentation_url": "https://docs.github.com/rest/reference/repos#list-organization-repositories"},
)

assert list(read_full_refresh(stream)) == []
stream_is_available, reason = stream.check_availability(logger)
assert not stream_is_available
assert "`Repositories` stream isn't available for organization `org_name`." in reason
assert len(responses.calls) == 1
assert responses.calls[0].request.url == "https://api.github.com/orgs/org_name/repos?per_page=100&sort=updated&direction=desc"

Expand Down Expand Up @@ -275,7 +282,9 @@ def test_stream_projects_disabled():
json={"message": "Projects are disabled for this repository", "documentation_url": "https://docs.github.com/v3/projects"},
)

assert list(read_full_refresh(stream)) == []
stream_is_available, reason = stream.check_availability(logger)
assert not stream_is_available
assert "`Projects` stream isn't available for repository `test_repo`." in reason
assert len(responses.calls) == 1
assert responses.calls[0].request.url == "https://api.github.com/repos/test_repo/projects?per_page=100&state=all"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.streams.http.auth import MultipleTokenAuthenticator
from airbyte_cdk.sources.streams.http.requests_native_auth.token import MultipleTokenAuthenticator
from source_github import SourceGithub


Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :---------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| 0.3.9 | 2022-12-14 | [19978](https://github.com/airbytehq/airbyte/pull/19978) | Update CDK dependency; move custom HTTPError handling into `AvailabilityStrategy` classes |
| 0.3.8 | 2022-11-10 | [19299](https://github.com/airbytehq/airbyte/pull/19299) | Fix events and workflow_runs datetimes |
| 0.3.7 | 2022-10-20 | [18213](https://github.com/airbytehq/airbyte/pull/18213) | Skip retry on HTTP 200 |
| 0.3.6 | 2022-10-11 | [17852](https://github.com/airbytehq/airbyte/pull/17852) | Use default behaviour, retry on 429 and all 5XX errors |
Expand Down

0 comments on commit f97db17

Please sign in to comment.