From 62d219fb9ab56ab2b87a9937bbb199bdf76580a7 Mon Sep 17 00:00:00 2001 From: Denys Davydov Date: Thu, 5 Jan 2023 16:07:34 +0200 Subject: [PATCH] Source google ads: retry transient errors (#20855) Co-authored-by: Baz --- .../connectors/source-google-ads/Dockerfile | 2 +- .../acceptance-test-config.yml | 22 ++++++++++------ .../integration_tests/configured_catalog.json | 12 +++++++++ .../source_google_ads/google_ads.py | 14 ++++++++++- .../unit_tests/test_streams.py | 25 +++++++++++++++++++ docs/integrations/sources/google-ads.md | 1 + 6 files changed, 66 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-google-ads/Dockerfile b/airbyte-integrations/connectors/source-google-ads/Dockerfile index d6b0a2a75460..01850ae0900a 100644 --- a/airbyte-integrations/connectors/source-google-ads/Dockerfile +++ b/airbyte-integrations/connectors/source-google-ads/Dockerfile @@ -13,5 +13,5 @@ COPY main.py ./ ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.5 +LABEL io.airbyte.version=0.2.6 LABEL io.airbyte.name=airbyte/source-google-ads diff --git a/airbyte-integrations/connectors/source-google-ads/acceptance-test-config.yml b/airbyte-integrations/connectors/source-google-ads/acceptance-test-config.yml index 5d89c3502494..f1840af554dc 100644 --- a/airbyte-integrations/connectors/source-google-ads/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-google-ads/acceptance-test-config.yml @@ -1,12 +1,11 @@ # See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests connector_image: airbyte/source-google-ads:dev +test_strictness_level: high acceptance_tests: spec: tests: - spec_path: "source_google_ads/spec.json" - backward_compatibility_tests_config: - disable_for_version: "0.1.45" connection: tests: - config_path: "secrets/config.json" @@ -16,24 +15,31 @@ acceptance_tests: discovery: tests: - config_path: "secrets/config.json" - backward_compatibility_tests_config: - disable_for_version: "0.2.4" basic_read: tests: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" + expect_records: + bypass_reason: "Bypassed because SAT run on active account. Dedicated Sandbox is needed." empty_streams: - name: "geographic_report" + bypass_reason: "The stream could not be fullfield with data manually." - name: "keyword_report" + bypass_reason: "The stream could not be fullfield with data manually." - name: "display_keyword_performance_report" + bypass_reason: "The stream could not be fullfield with data manually." - name: "display_topics_performance_report" + bypass_reason: "The stream could not be fullfield with data manually." - name: "shopping_performance_report" + bypass_reason: "The stream could not be fullfield with data manually." - name: "unhappytable" + bypass_reason: "The stream could not be fullfield with data manually." - name: "click_view" - timeout_seconds: 600 - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog_protobuf_msg.json" + bypass_reason: "The stream could not be fullfield with data manually." + timeout_seconds: 1200 full_refresh: tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" + timeout_seconds: 1200 + incremental: + bypass_reason: "Incremental tests are implemented using custom test, available by integration_tests/test_incremental.py" diff --git a/airbyte-integrations/connectors/source-google-ads/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-google-ads/integration_tests/configured_catalog.json index 0f1b89da1c0e..c338cfc5096b 100644 --- a/airbyte-integrations/connectors/source-google-ads/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-google-ads/integration_tests/configured_catalog.json @@ -12,6 +12,18 @@ "destination_sync_mode": "overwrite", "cursor_field": ["segments.date"] }, + { + "stream": { + "name": "ad_group_custom", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["segments.date"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite", + "cursor_field": ["segments.date"] + }, { "stream": { "name": "account_performance_report", diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py index 74417089069b..e09f2fd28f7f 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py @@ -3,12 +3,15 @@ # +import logging from enum import Enum from typing import Any, Iterator, List, Mapping, MutableMapping +import backoff import pendulum from google.ads.googleads.client import GoogleAdsClient from google.ads.googleads.v11.services.types.google_ads_service import GoogleAdsRow, SearchGoogleAdsResponse +from google.api_core.exceptions import ServerError, TooManyRequests from proto.marshal.collections import Repeated, RepeatedComposite REPORT_MAPPING = { @@ -31,6 +34,7 @@ "keyword_report": "keyword_view", } API_VERSION = "v11" +logger = logging.getLogger("airbyte") class GoogleAds: @@ -43,13 +47,21 @@ def __init__(self, credentials: MutableMapping[str, Any]): self.client = GoogleAdsClient.load_from_dict(credentials, version=API_VERSION) self.ga_service = self.client.get_service("GoogleAdsService") + @backoff.on_exception( + backoff.expo, + (ServerError, TooManyRequests), + on_backoff=lambda details: logger.info( + f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." + ), + max_tries=5, + ) def send_request(self, query: str, customer_id: str) -> Iterator[SearchGoogleAdsResponse]: client = self.client search_request = client.get_type("SearchGoogleAdsRequest") search_request.query = query search_request.page_size = self.DEFAULT_PAGE_SIZE search_request.customer_id = customer_id - yield self.ga_service.search(search_request) + return [self.ga_service.search(search_request)] def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py index d20ac79a62a2..7e600bef4991 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_streams.py @@ -9,6 +9,7 @@ from google.ads.googleads.errors import GoogleAdsException from google.ads.googleads.v11.errors.types.errors import ErrorCode, GoogleAdsError, GoogleAdsFailure from google.ads.googleads.v11.errors.types.request_error import RequestErrorEnum +from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests from grpc import RpcError from source_google_ads.google_ads import GoogleAds from source_google_ads.streams import ClickView @@ -193,3 +194,27 @@ def test_page_token_expired_it_should_fail_date_range_1_day(mock_ads_client, con stream.get_query.assert_called_with({"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"}) assert stream.get_query.call_count == 1 + + +@pytest.mark.parametrize("error_cls", (ResourceExhausted, TooManyRequests, InternalServerError, DataLoss)) +def test_retry_transient_errors(mocker, config, customers, error_cls): + mocker.patch("time.sleep") + credentials = config["credentials"] + credentials.update(use_proto_plus=True) + api = GoogleAds(credentials=credentials) + mocked_search = mocker.patch.object(api.ga_service, "search", side_effect=error_cls("Error message")) + incremental_stream_config = dict( + api=api, + conversion_window_days=config["conversion_window_days"], + start_date=config["start_date"], + end_date="2021-04-04", + customers=customers, + ) + stream = ClickView(**incremental_stream_config) + customer_id = next(iter(customers)).id + stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"} + records = [] + with pytest.raises(error_cls): + records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice)) + assert mocked_search.call_count == 5 + assert records == [] diff --git a/docs/integrations/sources/google-ads.md b/docs/integrations/sources/google-ads.md index 4877e27269e6..38a727f8c9a8 100644 --- a/docs/integrations/sources/google-ads.md +++ b/docs/integrations/sources/google-ads.md @@ -134,6 +134,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan | Version | Date | Pull Request | Subject | |:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------| +| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors | | `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream | | `0.2.4` | 2022-11-09 | [19208](https://github.com/airbytehq/airbyte/pull/19208) | Add TypeTransofrmer to Campaings stream to force proper type casting | | `0.2.3` | 2022-10-17 | [18069](https://github.com/airbytehq/airbyte/pull/18069) | Add `segments.hour`, `metrics.ctr`, `metrics.conversions` and `metrics.conversions_values` fields to `campaigns` report stream |