Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source GitHub: Use CDK caching and convert PR-related streams to incremental #7250

Merged
merged 18 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,14 @@
"stream": {
"name": "pull_request_stats",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["updated_at"]
},
{
"stream": {
Expand Down Expand Up @@ -257,11 +260,14 @@
"stream": {
"name": "reviews",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_at"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["updated_at"]
},
{
"stream": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
},
"changed_files": {
"type": ["null", "integer"]
},
"updated_at": {
"type": ["null", "string"],
"format": "date-time"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
},
"author_association": {
"type": ["null", "string"]
},
"updated_at": {
"type": ["null", "string"],
"format": "date-time"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
organization_args = {"authenticator": authenticator, "organizations": organizations}
default_branches, branches_to_pull = self._get_branches_data(config.get("branch", ""), full_refresh_args)

pull_request_stream = PullRequests(**incremental_args)

return [
Assignees(**full_refresh_args),
Branches(**full_refresh_args),
Expand All @@ -153,12 +155,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Organizations(**organization_args),
Projects(**incremental_args),
PullRequestCommentReactions(**incremental_args),
PullRequestStats(**full_refresh_args),
PullRequests(**incremental_args),
PullRequestStats(parent=pull_request_stream, **incremental_args),
pull_request_stream,
Releases(**incremental_args),
Repositories(**organization_args),
ReviewComments(**incremental_args),
Reviews(**full_refresh_args),
Reviews(parent=pull_request_stream, **incremental_args),
Stargazers(**incremental_args),
Tags(**full_refresh_args),
Teams(**organization_args),
Expand Down
162 changes: 78 additions & 84 deletions airbyte-integrations/connectors/source-github/source_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,16 @@
from urllib import parse

import requests
import vcr
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream
from requests.exceptions import HTTPError
from vcr.cassette import Cassette


def request_cache() -> Cassette:
"""
Builds VCR instance.
It deletes file everytime we create it, normally should be called only once.
We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files.
"""
filename = "request_cache.yml"
try:
os.remove(filename)
except FileNotFoundError:
pass

return vcr.use_cassette(str(filename), record_mode="new_episodes", serializer="yaml")


class GithubStream(HttpStream, ABC):
cache = request_cache()
url_base = "https://api.github.com/"

# To prevent dangerous behavior, the `vcr` library prohibits the use of nested caching.
# Here's an example of dangerous behavior:
# cache = Cassette.use('whatever')
# with cache:
# with cache:
# pass
#
# Therefore, we will only use `cache` for the top-level stream, so as not to cause possible difficulties.
top_level_stream = True

primary_key = "id"
use_cache = True

# GitHub pagination could be from 1 to 100.
page_size = 100
Expand Down Expand Up @@ -95,11 +68,7 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]:

def read_records(self, stream_slice: Mapping[str, any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
try:
if self.top_level_stream:
with self.cache:
yield from super().read_records(stream_slice=stream_slice, **kwargs)
else:
yield from super().read_records(stream_slice=stream_slice, **kwargs)
yield from super().read_records(stream_slice=stream_slice, **kwargs)
except HTTPError as e:
error_msg = str(e)

Expand Down Expand Up @@ -286,55 +255,6 @@ class Assignees(GithubStream):
"""


class PullRequestStats(GithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#get-a-pull-request
"""

top_level_stream = False

@property
def record_keys(self) -> List[str]:
return list(self.get_json_schema()["properties"].keys())

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for stream_slice in super().stream_slices(**kwargs):
pull_requests_stream = PullRequests(authenticator=self.authenticator, repositories=[stream_slice["repository"]], start_date="")
for pull_request in pull_requests_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {"pull_request_number": pull_request["number"], "repository": stream_slice["repository"]}

def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
yield self.transform(response.json(), repository=stream_slice["repository"])

def transform(self, record: MutableMapping[str, Any], repository: str = None) -> MutableMapping[str, Any]:
record = super().transform(record=record, repository=repository)
return {key: value for key, value in record.items() if key in self.record_keys}


class Reviews(GithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request
"""

top_level_stream = False

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}/reviews"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for stream_slice in super().stream_slices(**kwargs):
pull_requests_stream = PullRequests(authenticator=self.authenticator, repositories=[stream_slice["repository"]], start_date="")
for pull_request in pull_requests_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice):
yield {"pull_request_number": pull_request["number"], "repository": stream_slice["repository"]}


class Branches(GithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/repos#list-branches
Expand Down Expand Up @@ -503,7 +423,7 @@ def is_sorted_descending(self) -> bool:
"""
Depending if there any state we read stream in ascending or descending order.
"""
return self._first_read
return not self._first_read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why? Did we have an error connected to this? Did we send the wrong direction parameter or what?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was sending the wrong direction. https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/streams.py#L495-L496 states we want to sort in ascending order for the first run, then descending order for subsequent runs to allow the incremental behavior in https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-github/source_github/streams.py#L701-L702. However, the current stream version is setting is_sorted_descending to true if self._first_read is true, which is the opposite behavior.



class CommitComments(SemiIncrementalGithubStream):
Expand Down Expand Up @@ -716,6 +636,80 @@ class Issues(IncrementalGithubStream):
}


class PullRequestSubStream(SemiIncrementalGithubStream, ABC):
def __init__(self, parent: PullRequests, **kwargs):
self._parent_stream = parent
super().__init__(**kwargs)

use_cache = False

@property
def state_checkpoint_interval(self) -> Optional[int]:
return None
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for stream_slice in super().stream_slices(**kwargs):
pull_requests = list(self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, stream_state=stream_state))
if self._parent_stream.is_sorted_descending:
pull_requests.reverse()
for pull_request in pull_requests:
yield {"pull_request_number": pull_request["number"], "repository": stream_slice["repository"], self.cursor_field: pull_request[self.cursor_field]}

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
yield from super(SemiIncrementalGithubStream, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
)
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved


class PullRequestStats(PullRequestSubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#get-a-pull-request
"""

@property
def record_keys(self) -> List[str]:
return list(self.get_json_schema()["properties"].keys())

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}"

def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
yield self.transform(response.json(), stream_slice=stream_slice)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = {key: value for key, value in super().transform(record=record, repository=stream_slice["repository"]).items() if key in self.record_keys}
record[self.cursor_field] = stream_slice[self.cursor_field]
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
return record


class Reviews(PullRequestSubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request
"""

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}/reviews"

def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
for record in response.json(): # GitHub puts records in an array.
yield self.transform(record=record, stream_slice=stream_slice)

def transform(self, record: MutableMapping[str, Any], stream_slice: Mapping[str, Any]) -> MutableMapping[str, Any]:
record = super().transform(record=record, repository=stream_slice["repository"])
record[self.cursor_field] = stream_slice[self.cursor_field]
Zirochkaa marked this conversation as resolved.
Show resolved Hide resolved
return record


class ReviewComments(IncrementalGithubStream):
"""
API docs: https://docs.github.com/en/rest/reference/pulls#list-review-comments-in-a-repository
Expand All @@ -733,7 +727,7 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
class ReactionStream(GithubStream, ABC):

parent_key = "id"
top_level_stream = False
use_cache = False

def __init__(self, **kwargs):
self._stream_kwargs = deepcopy(kwargs)
Expand Down