From f69a78c9a4847188685c6f3341c08d8c215bbb99 Mon Sep 17 00:00:00 2001 From: Serhii Chvaliuk Date: Wed, 22 Jun 2022 11:49:04 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20Github:=20break=20point?= =?UTF-8?q?=20added=20for=20workflows=5Fruns=20stream=20(#13926)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergey Chvalyuk --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-github/Dockerfile | 2 +- .../source-github/source_github/streams.py | 30 ++++- .../source-github/unit_tests/test_stream.py | 121 ++++++++++++++++++ docs/integrations/sources/github.md | 13 +- 6 files changed, 162 insertions(+), 8 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 26db7e596f58..efa01185a5b4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -303,7 +303,7 @@ - name: GitHub sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e dockerRepository: airbyte/source-github - dockerImageTag: 0.2.35 + dockerImageTag: 0.2.36 documentationUrl: https://docs.airbyte.io/integrations/sources/github icon: github.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index d5b9183689b4..81382543dfa8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2620,7 +2620,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-github:0.2.35" +- dockerImage: "airbyte/source-github:0.2.36" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/github" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-github/Dockerfile b/airbyte-integrations/connectors/source-github/Dockerfile index 71fbf3ee2ede..b0b59e1e2c4f 100644 --- a/airbyte-integrations/connectors/source-github/Dockerfile +++ b/airbyte-integrations/connectors/source-github/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.35 +LABEL io.airbyte.version=0.2.36 LABEL io.airbyte.name=airbyte/source-github diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 06c348e50271..a663b67ff1a5 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -1106,13 +1106,16 @@ def convert_cursor_value(self, value): class WorkflowRuns(SemiIncrementalMixin, GithubStream): """ - Get all workflows of a GitHub repository + Get all workflow runs for a GitHub repository API documentation: https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository """ # key for accessing slice value from record record_slice_key = ["repository", "full_name"] + # https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs + re_run_period = 32 # days + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return f"repos/{stream_slice['repository']}/actions/runs" @@ -1121,6 +1124,31 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str, for record in response: yield record + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + # Records in the workflows_runs stream are naturally descending sorted by `created_at` field. + # On first sight this is not big deal because cursor_field is `updated_at`. + # But we still can use `created_at` as a breakpoint because after 30 days period + # https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs + # workflows_runs records cannot be updated. It means if we initially fully synced stream on subsequent incremental sync we need + # only to look behind on 30 days to find all records which were updated. + start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice) + break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string() + for record in super(SemiIncrementalMixin, self).read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ): + cursor_value = record[self.cursor_field] + created_at = record["created_at"] + if cursor_value > start_point: + yield record + if created_at < break_point: + break + class TeamMembers(GithubStream): """ diff --git a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py index a5d6f6282737..737bb7fe6ef9 100644 --- a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py @@ -10,6 +10,7 @@ import responses from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException from responses import matchers +from source_github import streams from source_github.streams import ( Branches, Collaborators, @@ -37,6 +38,7 @@ TeamMemberships, Teams, Users, + WorkflowRuns, ) from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental, urlbase @@ -949,3 +951,122 @@ def test_stream_commit_comment_reactions_incremental_read(): {"id": 154935432, "comment_id": 55538826, "created_at": "2022-02-01T16:00:00Z", "repository": "airbytehq/integration-test"}, {"id": 154935433, "comment_id": 55538827, "created_at": "2022-02-01T17:00:00Z", "repository": "airbytehq/integration-test"}, ] + + +@responses.activate +def test_stream_workflow_runs_read_incremental(monkeypatch): + + repository_args_with_start_date = { + "repositories": ["org/repos"], + "page_size_for_large_streams": 30, + "start_date": "2022-01-01T00:00:00Z", + } + + monkeypatch.setattr(streams, "DEFAULT_PAGE_SIZE", 1) + stream = WorkflowRuns(**repository_args_with_start_date) + + data = [ + {"id": 4, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z", "repository": {"full_name": "org/repos"}}, + {"id": 3, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z", "repository": {"full_name": "org/repos"}}, + {"id": 2, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z", "repository": {"full_name": "org/repos"}}, + {"id": 1, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z", "repository": {"full_name": "org/repos"}}, + ] + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[0:1]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[1:2]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[2:3]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[3:4]}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)], + ) + + state = {} + records = read_incremental(stream, state) + assert state == {"org/repos": {"updated_at": "2022-02-05T00:00:00Z"}} + + assert records == [ + {"id": 4, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z"}, + {"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z"}, + {"id": 2, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z"}, + {"id": 1, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z"}, + ] + + assert len(responses.calls) == 4 + + data.insert( + 0, + { + "id": 5, + "created_at": "2022-02-07T00:00:00Z", + "updated_at": "2022-02-07T00:00:00Z", + "repository": {"full_name": "org/repos"}, + }, + ) + + data[2]["updated_at"] = "2022-02-08T00:00:00Z" + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[0:1]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[1:2]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[2:3]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[3:4]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)], + ) + + responses.calls.reset() + records = read_incremental(stream, state) + + assert state == {"org/repos": {"updated_at": "2022-02-08T00:00:00Z"}} + assert records == [ + {"id": 5, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-07T00:00:00Z", "updated_at": "2022-02-07T00:00:00Z"}, + {"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-02-08T00:00:00Z"}, + ] + + assert len(responses.calls) == 4 diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 73f8e00d7b1c..4837dc40ecab 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -90,7 +90,7 @@ This connector outputs the following incremental streams: * [Review comments](https://docs.github.com/en/rest/reference/pulls#list-review-comments-in-a-repository) * [Reviews](https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request) * [Stargazers](https://docs.github.com/en/rest/reference/activity#list-stargazers) -* [WorkflowRuns](https://docs.github.com/en/rest/reference/actions#list-workflow-runs-for-a-repository) +* [WorkflowRuns](https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository) * [Workflows](https://docs.github.com/en/rest/reference/actions#workflows) ### Notes @@ -99,12 +99,16 @@ This connector outputs the following incremental streams: * read only new records; * output only new records. -2. Other 20 incremental streams are also incremental but with one difference, they: +2. Stream `workflow_runs` is almost pure incremental: + * read new records and some portion of old records (in past 30 days) [docs](https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs); + * output only new records. + +3. Other 19 incremental streams are also incremental but with one difference, they: * read all records; * output only new records. - Please, consider this behaviour when using those 20 incremental streams because it may affect you API call limits. + Please, consider this behaviour when using those 19 incremental streams because it may affect you API call limits. -3. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help. +4. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help. **The "Start date" configuration option does not apply to the streams below, because the GitHub API does not include dates which can be used for filtering:** * `assignees` @@ -137,6 +141,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------| +| 0.2.36 | 2022-06-20 | [13926](https://github.com/airbytehq/airbyte/pull/13926) | Break point added for `workflows_runs` stream | | 0.2.35 | 2022-06-16 | [13763](https://github.com/airbytehq/airbyte/pull/13763) | Use GraphQL for `pull_request_stats` stream | | 0.2.34 | 2022-06-14 | [13707](https://github.com/airbytehq/airbyte/pull/13707) | Fix API sorting, fix `get_starting_point` caching | | 0.2.33 | 2022-06-08 | [13558](https://github.com/airbytehq/airbyte/pull/13558) | Enable caching only for parent streams |