Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Github: break point added for workflows_runs stream #13926

Merged
merged 10 commits into from
Jun 22, 2022
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.35
LABEL io.airbyte.version=0.2.36
LABEL io.airbyte.name=airbyte/source-github
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,9 @@ class WorkflowRuns(SemiIncrementalMixin, GithubStream):
# key for accessing slice value from record
record_slice_key = ["repository", "full_name"]

# https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs
re_run_period = 32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment that this is a day


def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"repos/{stream_slice['repository']}/actions/runs"

Expand All @@ -1121,6 +1124,25 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str,
for record in response:
yield record

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice)
grubberr marked this conversation as resolved.
Show resolved Hide resolved
break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string()
for record in super(SemiIncrementalMixin, self).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state
):
cursor_value = record[self.cursor_field]
created_at = record["created_at"]
if cursor_value > start_point:
yield record
if created_at < break_point:
break


class TeamMembers(GithubStream):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import responses
from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException
from responses import matchers
from source_github import streams
from source_github.streams import (
Branches,
Collaborators,
Expand Down Expand Up @@ -37,6 +38,7 @@
TeamMemberships,
Teams,
Users,
WorkflowRuns,
)

from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental, urlbase
Expand Down Expand Up @@ -949,3 +951,122 @@ def test_stream_commit_comment_reactions_incremental_read():
{"id": 154935432, "comment_id": 55538826, "created_at": "2022-02-01T16:00:00Z", "repository": "airbytehq/integration-test"},
{"id": 154935433, "comment_id": 55538827, "created_at": "2022-02-01T17:00:00Z", "repository": "airbytehq/integration-test"},
]


@responses.activate
def test_stream_workflow_runs_read_incremental(monkeypatch):

repository_args_with_start_date = {
"repositories": ["org/repos"],
"page_size_for_large_streams": 30,
"start_date": "2022-01-01T00:00:00Z",
}

monkeypatch.setattr(streams, "DEFAULT_PAGE_SIZE", 1)
stream = WorkflowRuns(**repository_args_with_start_date)

data = [
{"id": 4, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z", "repository": {"full_name": "org/repos"}},
{"id": 3, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z", "repository": {"full_name": "org/repos"}},
{"id": 2, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z", "repository": {"full_name": "org/repos"}},
{"id": 1, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z", "repository": {"full_name": "org/repos"}},
]

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[0:1]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=2>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[1:2]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=3>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[2:3]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=4>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[3:4]},
match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)],
)

state = {}
records = read_incremental(stream, state)
assert state == {"org/repos": {"updated_at": "2022-02-05T00:00:00Z"}}

assert records == [
{"id": 4, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z"},
{"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z"},
{"id": 2, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z"},
{"id": 1, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z"},
]

assert len(responses.calls) == 4

data.insert(
0,
{
"id": 5,
"created_at": "2022-02-07T00:00:00Z",
"updated_at": "2022-02-07T00:00:00Z",
"repository": {"full_name": "org/repos"},
},
)

data[2]["updated_at"] = "2022-02-08T00:00:00Z"

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[0:1]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=2>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[1:2]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=3>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[2:3]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=4>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)],
)

responses.add(
"GET",
"https://api.github.com/repos/org/repos/actions/runs",
json={"total_count": len(data), "workflow_runs": data[3:4]},
headers={"Link": '<https://api.github.com/repositories/283046497/actions/runs?per_page=1&page=5>; rel="next"'},
match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)],
)

responses.calls.reset()
records = read_incremental(stream, state)

assert state == {"org/repos": {"updated_at": "2022-02-08T00:00:00Z"}}
assert records == [
{"id": 5, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-07T00:00:00Z", "updated_at": "2022-02-07T00:00:00Z"},
{"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-02-08T00:00:00Z"},
]

assert len(responses.calls) == 4
13 changes: 9 additions & 4 deletions docs/integrations/sources/github.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ This connector outputs the following incremental streams:
* [Review comments](https://docs.github.com/en/rest/reference/pulls#list-review-comments-in-a-repository)
* [Reviews](https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request)
* [Stargazers](https://docs.github.com/en/rest/reference/activity#list-stargazers)
* [WorkflowRuns](https://docs.github.com/en/rest/reference/actions#list-workflow-runs-for-a-repository)
* [WorkflowRuns](https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository)
* [Workflows](https://docs.github.com/en/rest/reference/actions#workflows)

### Notes
Expand All @@ -99,12 +99,16 @@ This connector outputs the following incremental streams:
* read only new records;
* output only new records.

2. Other 20 incremental streams are also incremental but with one difference, they:
2. Stream `workflow_runs` is almost pure incremental:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Amruta-Ranade could you proofread these docs?

* read new records and some portion of old records (in past 30 days) [docs](https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs);
* output only new records.

3. Other 19 incremental streams are also incremental but with one difference, they:
* read all records;
* output only new records.
Please, consider this behaviour when using those 20 incremental streams because it may affect you API call limits.
Please, consider this behaviour when using those 19 incremental streams because it may affect you API call limits.

3. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help.
4. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help.
**The "Start date" configuration option does not apply to the streams below, because the GitHub API does not include dates which can be used for filtering:**

* `assignees`
Expand Down Expand Up @@ -137,6 +141,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------|
| 0.2.36 | 2022-06-20 | [13926](https://github.com/airbytehq/airbyte/pull/13926) | Break point added for `workflows_runs` stream |
| 0.2.35 | 2022-06-16 | [13763](https://github.com/airbytehq/airbyte/pull/13763) | Use GraphQL for `pull_request_stats` stream |
| 0.2.34 | 2022-06-14 | [13707](https://github.com/airbytehq/airbyte/pull/13707) | Fix API sorting, fix `get_starting_point` caching |
| 0.2.33 | 2022-06-08 | [13558](https://github.com/airbytehq/airbyte/pull/13558) | Enable caching only for parent streams |
Expand Down