From a4e5b0c227ce132fc7f30579dfe19b1299cc7d3e Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 20 Jun 2022 08:15:04 +0300 Subject: [PATCH 1/8] break point added for workflows_runs stream Signed-off-by: Sergey Chvalyuk --- .../connectors/source-github/Dockerfile | 2 +- .../source-github/source_github/streams.py | 23 +++++++++++++++++++ docs/integrations/sources/github.md | 1 + 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-github/Dockerfile b/airbyte-integrations/connectors/source-github/Dockerfile index 71fbf3ee2ede..b0b59e1e2c4f 100644 --- a/airbyte-integrations/connectors/source-github/Dockerfile +++ b/airbyte-integrations/connectors/source-github/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.35 +LABEL io.airbyte.version=0.2.36 LABEL io.airbyte.name=airbyte/source-github diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 06c348e50271..c4e4b186a275 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -1113,6 +1113,9 @@ class WorkflowRuns(SemiIncrementalMixin, GithubStream): # key for accessing slice value from record record_slice_key = ["repository", "full_name"] + # https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs + re_run_period = 32 + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return f"repos/{stream_slice['repository']}/actions/runs" @@ -1121,6 +1124,26 @@ def parse_response(self, response: requests.Response, stream_slice: Mapping[str, for record in response: yield record + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice) + break_point = pendulum.now("UTC").replace(microsecond=0) - pendulum.duration(days=self.re_run_period) + break_point = min(start_point, break_point.to_iso8601_string()) + for record in super(SemiIncrementalMixin, self).read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ): + cursor_value = record[self.cursor_field] + created_at = record["created_at"] + if cursor_value > start_point: + yield record + if created_at < break_point: + break + class TeamMembers(GithubStream): """ diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 73f8e00d7b1c..c684d25d169f 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -137,6 +137,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------| +| 0.2.36 | 2022-06-20 | []() | | | 0.2.35 | 2022-06-16 | [13763](https://github.com/airbytehq/airbyte/pull/13763) | Use GraphQL for `pull_request_stats` stream | | 0.2.34 | 2022-06-14 | [13707](https://github.com/airbytehq/airbyte/pull/13707) | Fix API sorting, fix `get_starting_point` caching | | 0.2.33 | 2022-06-08 | [13558](https://github.com/airbytehq/airbyte/pull/13558) | Enable caching only for parent streams | From 6cbdae5dcd188ec117ff4062455d6c0c2649a451 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 20 Jun 2022 08:18:57 +0300 Subject: [PATCH 2/8] github.md updated Signed-off-by: Sergey Chvalyuk --- docs/integrations/sources/github.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index c684d25d169f..f40eecc4fc8e 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -137,7 +137,7 @@ The GitHub connector should not run into GitHub API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------| :--- |:-------------------------------------------------------------------------------------------------------------| -| 0.2.36 | 2022-06-20 | []() | | +| 0.2.36 | 2022-06-20 | [13926](https://github.com/airbytehq/airbyte/pull/13926) | Break point added for `workflows_runs` stream | | 0.2.35 | 2022-06-16 | [13763](https://github.com/airbytehq/airbyte/pull/13763) | Use GraphQL for `pull_request_stats` stream | | 0.2.34 | 2022-06-14 | [13707](https://github.com/airbytehq/airbyte/pull/13707) | Fix API sorting, fix `get_starting_point` caching | | 0.2.33 | 2022-06-08 | [13558](https://github.com/airbytehq/airbyte/pull/13558) | Enable caching only for parent streams | From 2d9924a4c9bde2ac108e90c7f652616376ef6a17 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 20 Jun 2022 09:28:35 +0300 Subject: [PATCH 3/8] github.md updated Signed-off-by: Sergey Chvalyuk --- docs/integrations/sources/github.md | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index f40eecc4fc8e..4837dc40ecab 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -90,7 +90,7 @@ This connector outputs the following incremental streams: * [Review comments](https://docs.github.com/en/rest/reference/pulls#list-review-comments-in-a-repository) * [Reviews](https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request) * [Stargazers](https://docs.github.com/en/rest/reference/activity#list-stargazers) -* [WorkflowRuns](https://docs.github.com/en/rest/reference/actions#list-workflow-runs-for-a-repository) +* [WorkflowRuns](https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository) * [Workflows](https://docs.github.com/en/rest/reference/actions#workflows) ### Notes @@ -99,12 +99,16 @@ This connector outputs the following incremental streams: * read only new records; * output only new records. -2. Other 20 incremental streams are also incremental but with one difference, they: +2. Stream `workflow_runs` is almost pure incremental: + * read new records and some portion of old records (in past 30 days) [docs](https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs); + * output only new records. + +3. Other 19 incremental streams are also incremental but with one difference, they: * read all records; * output only new records. - Please, consider this behaviour when using those 20 incremental streams because it may affect you API call limits. + Please, consider this behaviour when using those 19 incremental streams because it may affect you API call limits. -3. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help. +4. We are passing few parameters \(`since`, `sort` and `direction`\) to GitHub in order to filter records and sometimes for large streams specifying very distant `start_date` in the past may result in keep on getting error from GitHub instead of records \(respective `WARN` log message will be outputted\). In this case Specifying more recent `start_date` may help. **The "Start date" configuration option does not apply to the streams below, because the GitHub API does not include dates which can be used for filtering:** * `assignees` From 97319d390fac9e02f3974a0ecc85b4ed985e8f2a Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 20 Jun 2022 10:41:49 +0300 Subject: [PATCH 4/8] break_point just before start_date on 32 days Signed-off-by: Sergey Chvalyuk --- .../connectors/source-github/source_github/streams.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index c4e4b186a275..f46b9ec5d783 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -1132,8 +1132,7 @@ def read_records( stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice) - break_point = pendulum.now("UTC").replace(microsecond=0) - pendulum.duration(days=self.re_run_period) - break_point = min(start_point, break_point.to_iso8601_string()) + break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string() for record in super(SemiIncrementalMixin, self).read_records( sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state ): From 66b9afa9136cd8cba3e4b1f089c07d24725f9350 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Mon, 20 Jun 2022 14:24:34 +0300 Subject: [PATCH 5/8] test_stream_workflow_runs_read_incremental added Signed-off-by: Sergey Chvalyuk --- .../source-github/unit_tests/test_stream.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py index a5d6f6282737..737bb7fe6ef9 100644 --- a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py @@ -10,6 +10,7 @@ import responses from airbyte_cdk.sources.streams.http.exceptions import BaseBackoffException from responses import matchers +from source_github import streams from source_github.streams import ( Branches, Collaborators, @@ -37,6 +38,7 @@ TeamMemberships, Teams, Users, + WorkflowRuns, ) from .utils import ProjectsResponsesAPI, read_full_refresh, read_incremental, urlbase @@ -949,3 +951,122 @@ def test_stream_commit_comment_reactions_incremental_read(): {"id": 154935432, "comment_id": 55538826, "created_at": "2022-02-01T16:00:00Z", "repository": "airbytehq/integration-test"}, {"id": 154935433, "comment_id": 55538827, "created_at": "2022-02-01T17:00:00Z", "repository": "airbytehq/integration-test"}, ] + + +@responses.activate +def test_stream_workflow_runs_read_incremental(monkeypatch): + + repository_args_with_start_date = { + "repositories": ["org/repos"], + "page_size_for_large_streams": 30, + "start_date": "2022-01-01T00:00:00Z", + } + + monkeypatch.setattr(streams, "DEFAULT_PAGE_SIZE", 1) + stream = WorkflowRuns(**repository_args_with_start_date) + + data = [ + {"id": 4, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z", "repository": {"full_name": "org/repos"}}, + {"id": 3, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z", "repository": {"full_name": "org/repos"}}, + {"id": 2, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z", "repository": {"full_name": "org/repos"}}, + {"id": 1, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z", "repository": {"full_name": "org/repos"}}, + ] + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[0:1]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[1:2]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[2:3]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[3:4]}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)], + ) + + state = {} + records = read_incremental(stream, state) + assert state == {"org/repos": {"updated_at": "2022-02-05T00:00:00Z"}} + + assert records == [ + {"id": 4, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-05T00:00:00Z", "updated_at": "2022-02-05T00:00:00Z"}, + {"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-01-15T00:00:00Z"}, + {"id": 2, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-03T00:00:00Z", "updated_at": "2022-01-03T00:00:00Z"}, + {"id": 1, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-02T00:00:00Z", "updated_at": "2022-01-02T00:00:00Z"}, + ] + + assert len(responses.calls) == 4 + + data.insert( + 0, + { + "id": 5, + "created_at": "2022-02-07T00:00:00Z", + "updated_at": "2022-02-07T00:00:00Z", + "repository": {"full_name": "org/repos"}, + }, + ) + + data[2]["updated_at"] = "2022-02-08T00:00:00Z" + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[0:1]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[1:2]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "2"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[2:3]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "3"}, strict_match=True)], + ) + + responses.add( + "GET", + "https://api.github.com/repos/org/repos/actions/runs", + json={"total_count": len(data), "workflow_runs": data[3:4]}, + headers={"Link": '; rel="next"'}, + match=[matchers.query_param_matcher({"per_page": "1", "page": "4"}, strict_match=True)], + ) + + responses.calls.reset() + records = read_incremental(stream, state) + + assert state == {"org/repos": {"updated_at": "2022-02-08T00:00:00Z"}} + assert records == [ + {"id": 5, "repository": {"full_name": "org/repos"}, "created_at": "2022-02-07T00:00:00Z", "updated_at": "2022-02-07T00:00:00Z"}, + {"id": 3, "repository": {"full_name": "org/repos"}, "created_at": "2022-01-15T00:00:00Z", "updated_at": "2022-02-08T00:00:00Z"}, + ] + + assert len(responses.calls) == 4 From 801e5554bc716e20f566264ed217035a2f57f77c Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Tue, 21 Jun 2022 15:57:53 +0300 Subject: [PATCH 6/8] improve comments Signed-off-by: Sergey Chvalyuk --- .../connectors/source-github/source_github/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index f46b9ec5d783..69f9074f3f8a 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -1114,7 +1114,7 @@ class WorkflowRuns(SemiIncrementalMixin, GithubStream): record_slice_key = ["repository", "full_name"] # https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs - re_run_period = 32 + re_run_period = 32 # days def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return f"repos/{stream_slice['repository']}/actions/runs" From edb3bbfa777d3100f79bcd03830a98b77dbc3900 Mon Sep 17 00:00:00 2001 From: Sergey Chvalyuk Date: Wed, 22 Jun 2022 10:57:26 +0300 Subject: [PATCH 7/8] add PR description Signed-off-by: Sergey Chvalyuk --- .../connectors/source-github/source_github/streams.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 69f9074f3f8a..d47a74c6a3b6 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -1106,7 +1106,7 @@ def convert_cursor_value(self, value): class WorkflowRuns(SemiIncrementalMixin, GithubStream): """ - Get all workflows of a GitHub repository + Get all workflow runs for a GitHub repository API documentation: https://docs.github.com/en/rest/actions/workflow-runs#list-workflow-runs-for-a-repository """ @@ -1131,6 +1131,12 @@ def read_records( stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, ) -> Iterable[Mapping[str, Any]]: + # Records in the workflows_runs stream are naturally descending sorted by `created_at` field. + # On first sight this is not big deal because cursor_field is `updated_at`. + # But we still can use `created_at` as a breakpoint because after 30 days period + # https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs + # workflows_runs records cannot be updated. It means if we initially fully synced stream on subsequent incremental sync we need + # only to look behind on 30 days to find all records which were updated. start_point = self.get_starting_point(stream_state=stream_state, stream_slice=stream_slice) break_point = (pendulum.parse(start_point) - pendulum.duration(days=self.re_run_period)).to_iso8601_string() for record in super(SemiIncrementalMixin, self).read_records( From 587f4aeade5a3495221ad38b0b5d144dcf9b10a3 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Wed, 22 Jun 2022 08:46:43 +0000 Subject: [PATCH 8/8] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-github/source_github/streams.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 26db7e596f58..efa01185a5b4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -303,7 +303,7 @@ - name: GitHub sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e dockerRepository: airbyte/source-github - dockerImageTag: 0.2.35 + dockerImageTag: 0.2.36 documentationUrl: https://docs.airbyte.io/integrations/sources/github icon: github.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index d5b9183689b4..81382543dfa8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2620,7 +2620,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-github:0.2.35" +- dockerImage: "airbyte/source-github:0.2.36" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/github" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index d47a74c6a3b6..a663b67ff1a5 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -1114,7 +1114,7 @@ class WorkflowRuns(SemiIncrementalMixin, GithubStream): record_slice_key = ["repository", "full_name"] # https://docs.github.com/en/actions/managing-workflow-runs/re-running-workflows-and-jobs - re_run_period = 32 # days + re_run_period = 32 # days def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return f"repos/{stream_slice['repository']}/actions/runs"