diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 973f9fdee9f1..0e38de5907a6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -217,7 +217,7 @@ - name: GitHub sourceDefinitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e dockerRepository: airbyte/source-github - dockerImageTag: 0.2.9 + dockerImageTag: 0.2.10 documentationUrl: https://docs.airbyte.io/integrations/sources/github icon: github.svg sourceType: api diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java index a08751b25164..63e5478c17cf 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/formatter/DefaultBigQueryDenormalizedRecordFormatter.java @@ -191,7 +191,7 @@ private JsonNode getObjectNode(final FieldList fields, final JsonNode root) { @Override public Schema getBigQuerySchema(final JsonNode jsonSchema) { - final List fieldList = getSchemaFields(namingResolver, jsonSchema); + final List fieldList = getSchemaFields(namingResolver, jsonSchema); if (fieldList.stream().noneMatch(f -> f.getName().equals(JavaBaseConstants.COLUMN_NAME_AB_ID))) { fieldList.add(Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING)); } diff --git a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py index bcf2569a6779..2d8c880cf3e3 100644 --- a/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py +++ b/airbyte-integrations/connectors/source-amazon-seller-partner/source_amazon_seller_partner/streams.py @@ -644,4 +644,4 @@ def request_params( return params def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: - yield from response.json().get(self.data_field, {}).get("shippingLabels", []) \ No newline at end of file + yield from response.json().get(self.data_field, {}).get("shippingLabels", []) diff --git a/airbyte-integrations/connectors/source-github/Dockerfile b/airbyte-integrations/connectors/source-github/Dockerfile index d9274c32c19a..3a7e115d6489 100644 --- a/airbyte-integrations/connectors/source-github/Dockerfile +++ b/airbyte-integrations/connectors/source-github/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.9 +LABEL io.airbyte.version=0.2.10 LABEL io.airbyte.name=airbyte/source-github diff --git a/airbyte-integrations/connectors/source-github/acceptance-test-config.yml b/airbyte-integrations/connectors/source-github/acceptance-test-config.yml index bb0bdb411995..caf55dcbb65f 100644 --- a/airbyte-integrations/connectors/source-github/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-github/acceptance-test-config.yml @@ -25,9 +25,11 @@ tests: issue_milestones: ["airbytehq/integration-test", "updated_at"] issues: ["airbytehq/integration-test", "updated_at"] projects: ["airbytehq/integration-test", "updated_at"] + pull_request_stats: ["airbytehq/integration-test", "updated_at"] pull_requests: ["airbytehq/integration-test", "updated_at"] releases: ["airbytehq/integration-test", "created_at"] review_comments: ["airbytehq/integration-test", "updated_at"] + reviews: ["airbytehq/integration-test", "submitted_at"] stargazers: ["airbytehq/integration-test", "starred_at"] full_refresh: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-github/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-github/integration_tests/abnormal_state.json index dfae03d57420..48d22b3e7e83 100644 --- a/airbyte-integrations/connectors/source-github/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-github/integration_tests/abnormal_state.json @@ -39,6 +39,11 @@ "updated_at": "2121-06-28T17:24:51Z" } }, + "pull_request_stats": { + "airbytehq/integration-test": { + "updated_at": "2121-06-29T02:04:57Z" + } + }, "pull_requests": { "airbytehq/integration-test": { "updated_at": "2121-06-28T23:36:35Z" @@ -54,6 +59,11 @@ "updated_at": "2121-06-23T23:57:07Z" } }, + "reviews": { + "airbytehq/integration-test": { + "submitted_at": "2121-06-29T02:04:57Z" + } + }, "stargazers": { "airbytehq/integration-test": { "starred_at": "2121-06-29T02:04:57Z" diff --git a/airbyte-integrations/connectors/source-github/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-github/integration_tests/configured_catalog.json index 61065af87411..dfe46cf08336 100644 --- a/airbyte-integrations/connectors/source-github/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-github/integration_tests/configured_catalog.json @@ -198,11 +198,14 @@ "stream": { "name": "pull_request_stats", "json_schema": {}, - "supported_sync_modes": ["full_refresh"], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["updated_at"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" + "sync_mode": "incremental", + "destination_sync_mode": "append", + "cursor_field": ["updated_at"] }, { "stream": { @@ -257,11 +260,14 @@ "stream": { "name": "reviews", "json_schema": {}, - "supported_sync_modes": ["full_refresh"], + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["submitted_at"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" + "sync_mode": "incremental", + "destination_sync_mode": "append", + "cursor_field": ["submitted_at"] }, { "stream": { diff --git a/airbyte-integrations/connectors/source-github/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-github/integration_tests/sample_state.json index 068a99d1f717..86698788537c 100644 --- a/airbyte-integrations/connectors/source-github/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-github/integration_tests/sample_state.json @@ -29,6 +29,11 @@ "created_at": "2021-06-23T23:57:07Z" } }, + "pull_request_stats": { + "airbytehq/integration-test": { + "updated_at": "2021-08-30T12:01:15Z" + } + }, "pull_requests": { "airbytehq/integration-test": { "updated_at": "2021-06-28T23:36:35Z" @@ -53,5 +58,10 @@ "airbytehq/integration-test": { "created_at": "2021-06-30T10:04:41Z" } + }, + "reviews": { + "airbytehq/integration-test": { + "submitted_at": "2021-08-30T12:01:15Z" + } } } diff --git a/airbyte-integrations/connectors/source-github/source_github/schemas/pull_request_stats.json b/airbyte-integrations/connectors/source-github/source_github/schemas/pull_request_stats.json index 3b221876cfae..90ebf80f14a2 100644 --- a/airbyte-integrations/connectors/source-github/source_github/schemas/pull_request_stats.json +++ b/airbyte-integrations/connectors/source-github/source_github/schemas/pull_request_stats.json @@ -49,6 +49,10 @@ }, "changed_files": { "type": ["null", "integer"] + }, + "updated_at": { + "type": ["null", "string"], + "format": "date-time" } } } diff --git a/airbyte-integrations/connectors/source-github/source_github/source.py b/airbyte-integrations/connectors/source-github/source_github/source.py index cf1b6f4d607d..56970d252c1e 100644 --- a/airbyte-integrations/connectors/source-github/source_github/source.py +++ b/airbyte-integrations/connectors/source-github/source_github/source.py @@ -179,12 +179,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Organizations(**organization_args), Projects(**repository_args_with_start_date), PullRequestCommentReactions(**repository_args_with_start_date), - PullRequestStats(parent=pull_requests_stream, **repository_args), + PullRequestStats(parent=pull_requests_stream, **repository_args_with_start_date), PullRequests(**repository_args_with_start_date), Releases(**repository_args_with_start_date), Repositories(**organization_args), ReviewComments(**repository_args_with_start_date), - Reviews(parent=pull_requests_stream, **repository_args), + Reviews(parent=pull_requests_stream, **repository_args_with_start_date), Stargazers(**repository_args_with_start_date), Tags(**repository_args), Teams(**organization_args), diff --git a/airbyte-integrations/connectors/source-github/source_github/streams.py b/airbyte-integrations/connectors/source-github/source_github/streams.py index 8da8e84b11c8..62c65a695688 100644 --- a/airbyte-integrations/connectors/source-github/source_github/streams.py +++ b/airbyte-integrations/connectors/source-github/source_github/streams.py @@ -2,7 +2,6 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # -import os import time from abc import ABC, abstractmethod from copy import deepcopy @@ -10,43 +9,16 @@ from urllib import parse import requests -import vcr from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from requests.exceptions import HTTPError -from vcr.cassette import Cassette - - -def request_cache() -> Cassette: - """ - Builds VCR instance. - It deletes file everytime we create it, normally should be called only once. - We can't use NamedTemporaryFile here because yaml serializer doesn't work well with empty files. - """ - filename = "request_cache.yml" - try: - os.remove(filename) - except FileNotFoundError: - pass - - return vcr.use_cassette(str(filename), record_mode="new_episodes", serializer="yaml") class GithubStream(HttpStream, ABC): - cache = request_cache() url_base = "https://api.github.com/" - # To prevent dangerous behavior, the `vcr` library prohibits the use of nested caching. - # Here's an example of dangerous behavior: - # cache = Cassette.use('whatever') - # with cache: - # with cache: - # pass - # - # Therefore, we will only use `cache` for the top-level stream, so as not to cause possible difficulties. - top_level_stream = True - primary_key = "id" + use_cache = True # GitHub pagination could be from 1 to 100. page_size = 100 @@ -100,11 +72,7 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]: def read_records(self, stream_slice: Mapping[str, any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: try: - if self.top_level_stream: - with self.cache: - yield from super().read_records(stream_slice=stream_slice, **kwargs) - else: - yield from super().read_records(stream_slice=stream_slice, **kwargs) + yield from super().read_records(stream_slice=stream_slice, **kwargs) except HTTPError as e: error_msg = str(e) @@ -422,6 +390,7 @@ class PullRequests(SemiIncrementalGithubStream): """ page_size = 50 + first_read_override_key = "first_read_override" def __init__(self, **kwargs): super().__init__(**kwargs) @@ -431,7 +400,7 @@ def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iter """ Decide if this a first read or not by the presence of the state object """ - self._first_read = not bool(stream_state) + self._first_read = not bool(stream_state) or stream_state.get(self.first_read_override_key, False) yield from super().read_records(stream_state=stream_state, **kwargs) def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: @@ -459,7 +428,7 @@ def is_sorted_descending(self) -> bool: """ Depending if there any state we read stream in ascending or descending order. """ - return self._first_read + return not self._first_read class CommitComments(SemiIncrementalGithubStream): @@ -686,8 +655,8 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: # Pull request substreams -class PullRequestSubstream(HttpSubStream, GithubStream, ABC): - top_level_stream = False +class PullRequestSubstream(HttpSubStream, SemiIncrementalGithubStream, ABC): + use_cache = False def __init__(self, parent: PullRequests, **kwargs): super().__init__(parent=parent, **kwargs) @@ -695,14 +664,33 @@ def __init__(self, parent: PullRequests, **kwargs): def stream_slices( self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - parent_stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state) - + """ + Override the parent PullRequests stream configuration to always fetch records in ascending order + """ + parent_state = deepcopy(stream_state) or {} + parent_state[PullRequests.first_read_override_key] = True + parent_stream_slices = super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=parent_state) for parent_stream_slice in parent_stream_slices: yield { "pull_request_number": parent_stream_slice["parent"]["number"], "repository": parent_stream_slice["parent"]["repository"], } + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """ + We've already determined the list of pull requests to run the stream against. + Skip the start_point_map and cursor_field logic in SemiIncrementalGithubStream.read_records. + """ + yield from super(SemiIncrementalGithubStream, self).read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + class PullRequestStats(PullRequestSubstream): """ @@ -731,11 +719,21 @@ class Reviews(PullRequestSubstream): API docs: https://docs.github.com/en/rest/reference/pulls#list-reviews-for-a-pull-request """ + cursor_field = "submitted_at" + def path( self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None ) -> str: return f"repos/{stream_slice['repository']}/pulls/{stream_slice['pull_request_number']}/reviews" + # Set the parent stream state's cursor field before fetching its records + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + parent_state = deepcopy(stream_state) or {} + for repository in self.repositories: + if repository in parent_state and self.cursor_field in parent_state[repository]: + parent_state[repository][self.parent.cursor_field] = parent_state[repository][self.cursor_field] + yield from super().stream_slices(stream_state=parent_state, **kwargs) + # Reactions streams @@ -743,7 +741,7 @@ def path( class ReactionStream(GithubStream, ABC): parent_key = "id" - top_level_stream = False + use_cache = False def __init__(self, **kwargs): self._stream_kwargs = deepcopy(kwargs) diff --git a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py index 6f1cbf1d6548..1f06233fd45a 100644 --- a/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py +++ b/airbyte-integrations/connectors/source-github/unit_tests/test_stream.py @@ -1,6 +1,7 @@ # # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # + from http import HTTPStatus from unittest.mock import patch diff --git a/airbyte-integrations/connectors/source-jira/source_jira/source.py b/airbyte-integrations/connectors/source-jira/source_jira/source.py index 3689d253b801..538a9ff94ed3 100644 --- a/airbyte-integrations/connectors/source-jira/source_jira/source.py +++ b/airbyte-integrations/connectors/source-jira/source_jira/source.py @@ -107,7 +107,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: **incremental_args, additional_fields=config.get("additional_fields", []), expand_changelog=config.get("expand_issue_changelog", False), - render_fields=render_fields + render_fields=render_fields, ) issue_fields_stream = IssueFields(**args) experimental_streams = [] diff --git a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/invoices.json b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/invoices.json index a7bacc0d38b0..fecf63de82eb 100644 --- a/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/invoices.json +++ b/airbyte-integrations/connectors/source-recurly/source_recurly/schemas/invoices.json @@ -16,11 +16,7 @@ "title": "Invoice type", "description": "Invoices are either charge, credit, or legacy invoices.", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "origin": { "title": "Origin", @@ -143,10 +139,7 @@ "description": "An automatic invoice means a corresponding transaction is run using the account's billing information at the same time the invoice is created. Manual invoices are created without a corresponding transaction. The merchant must enter a manual payment transaction or have the customer pay the invoice with an automatic method, like credit card, PayPal, Amazon, or ACH bank payment.", "default": "automatic", "type": ["null", "string"], - "enum": [ - "automatic", - "manual" - ] + "enum": ["automatic", "manual"] }, "po_number": { "type": ["null", "string"], @@ -500,10 +493,7 @@ "title": "Line item type", "description": "Charges are positive line items that debit the account. Credits are negative line items that credit the account.", "type": ["null", "string"], - "enum": [ - "charge", - "credit" - ] + "enum": ["charge", "credit"] }, "item_code": { "type": ["null", "string"], @@ -532,10 +522,7 @@ "title": "Current state of the line item", "description": "Pending line items are charges or credits on an account that have not been applied to an invoice yet. Invoiced line items will always have an `invoice_id` value.", "type": ["null", "string"], - "enum": [ - "invoiced", - "pending" - ] + "enum": ["invoiced", "pending"] }, "legacy_category": { "title": "Legacy category", @@ -1064,11 +1051,7 @@ "type": { "title": "Invoice type", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "state": { "title": "Invoice state", @@ -1106,11 +1089,7 @@ "type": { "title": "Invoice type", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "state": { "title": "Invoice state", @@ -1142,13 +1121,7 @@ "title": "Transaction type", "description": "- `authorization` \u2013 verifies billing information and places a hold on money in the customer's account.\n- `capture` \u2013 captures funds held by an authorization and completes a purchase.\n- `purchase` \u2013 combines the authorization and capture in one transaction.\n- `refund` \u2013 returns all or a portion of the money collected in a previous transaction to the customer.\n- `verify` \u2013 a $0 or $1 transaction used to verify billing information which is immediately voided.\n", "type": ["null", "string"], - "enum": [ - "authorization", - "capture", - "purchase", - "refund", - "verify" - ] + "enum": ["authorization", "capture", "purchase", "refund", "verify"] }, "origin": { "title": "Origin of transaction", @@ -1263,10 +1236,7 @@ "collection_method": { "description": "The method by which the payment was collected.", "type": ["null", "string"], - "enum": [ - "automatic", - "manual" - ] + "enum": ["automatic", "manual"] }, "payment_method": { "properties": { @@ -1362,10 +1332,7 @@ "account_type": { "description": "The bank account type. Only present for ACH payment methods.", "type": ["null", "string"], - "enum": [ - "checking", - "savings" - ] + "enum": ["checking", "savings"] }, "routing_number": { "type": ["null", "string"], @@ -1457,16 +1424,7 @@ "title": "CVV check", "description": "When processed, result from checking the CVV/CVC value on the transaction.", "type": ["null", "string"], - "enum": [ - "D", - "I", - "M", - "N", - "P", - "S", - "U", - "X" - ] + "enum": ["D", "I", "M", "N", "P", "S", "U", "X"] }, "avs_check": { "title": "AVS check", @@ -1548,12 +1506,7 @@ "title": "Action", "description": "The action for which the credit was created.", "type": ["null", "string"], - "enum": [ - "payment", - "reduction", - "refund", - "write_off" - ] + "enum": ["payment", "reduction", "refund", "write_off"] }, "account": { "type": "object", @@ -1626,11 +1579,7 @@ "type": { "title": "Invoice type", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "state": { "title": "Invoice state", @@ -1668,11 +1617,7 @@ "type": { "title": "Invoice type", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "state": { "title": "Invoice state", @@ -1803,11 +1748,7 @@ "type": { "title": "Invoice type", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "state": { "title": "Invoice state", @@ -1845,11 +1786,7 @@ "type": { "title": "Invoice type", "type": ["null", "string"], - "enum": [ - "charge", - "credit", - "legacy" - ] + "enum": ["charge", "credit", "legacy"] }, "state": { "title": "Invoice state", @@ -2002,10 +1939,7 @@ "collection_method": { "description": "The method by which the payment was collected.", "type": ["null", "string"], - "enum": [ - "automatic", - "manual" - ] + "enum": ["automatic", "manual"] }, "payment_method": { "type": "object", @@ -2102,10 +2036,7 @@ "account_type": { "description": "The bank account type. Only present for ACH payment methods.", "type": ["null", "string"], - "enum": [ - "checking", - "savings" - ] + "enum": ["checking", "savings"] }, "routing_number": { "type": ["null", "string"], @@ -2116,7 +2047,6 @@ "description": "The bank name of this routing number." } } - }, "ip_address_v4": { "type": ["null", "string"], @@ -2197,16 +2127,7 @@ "title": "CVV check", "description": "When processed, result from checking the CVV/CVC value on the transaction.", "type": ["null", "string"], - "enum": [ - "D", - "I", - "M", - "N", - "P", - "S", - "U", - "X" - ] + "enum": ["D", "I", "M", "N", "P", "S", "U", "X"] }, "avs_check": { "title": "AVS check", diff --git a/docs/integrations/sources/github.md b/docs/integrations/sources/github.md index 57bf544788f0..7e6b0972a08e 100644 --- a/docs/integrations/sources/github.md +++ b/docs/integrations/sources/github.md @@ -92,6 +92,7 @@ Your token should have at least the `repo` scope. Depending on which streams you | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.2.10 | 2021-01-03 | [7250](https://github.com/airbytehq/airbyte/pull/7250) | Use CDK caching and convert PR-related streams to incremental | | 0.2.9 | 2021-12-29 | [9179](https://github.com/airbytehq/airbyte/pull/9179) | Use default retry delays on server error responses | | 0.2.8 | 2021-12-07 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description | | 0.2.7 | 2021-12-06 | [8518](https://github.com/airbytehq/airbyte/pull/8518) | Add connection retry with Github |