Skip to content

Commit

Permalink
Source google ads: retry transient errors (#20855)
Browse files Browse the repository at this point in the history
Co-authored-by: Baz <oleksandr.bazarnov@globallogic.com>
  • Loading branch information
davydov-d and bazarnov authored Jan 5, 2023
1 parent 4897bbc commit 62d219f
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ COPY main.py ./

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-google-ads:dev
test_strictness_level: high
acceptance_tests:
spec:
tests:
- spec_path: "source_google_ads/spec.json"
backward_compatibility_tests_config:
disable_for_version: "0.1.45"
connection:
tests:
- config_path: "secrets/config.json"
Expand All @@ -16,24 +15,31 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: "0.2.4"
basic_read:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
expect_records:
bypass_reason: "Bypassed because SAT run on active account. Dedicated Sandbox is needed."
empty_streams:
- name: "geographic_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "keyword_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "display_keyword_performance_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "display_topics_performance_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "shopping_performance_report"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "unhappytable"
bypass_reason: "The stream could not be fullfield with data manually."
- name: "click_view"
timeout_seconds: 600
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_protobuf_msg.json"
bypass_reason: "The stream could not be fullfield with data manually."
timeout_seconds: 1200
full_refresh:
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 1200
incremental:
bypass_reason: "Incremental tests are implemented using custom test, available by integration_tests/test_incremental.py"
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "ad_group_custom",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
},
{
"stream": {
"name": "account_performance_report",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
#


import logging
from enum import Enum
from typing import Any, Iterator, List, Mapping, MutableMapping

import backoff
import pendulum
from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.v11.services.types.google_ads_service import GoogleAdsRow, SearchGoogleAdsResponse
from google.api_core.exceptions import ServerError, TooManyRequests
from proto.marshal.collections import Repeated, RepeatedComposite

REPORT_MAPPING = {
Expand All @@ -31,6 +34,7 @@
"keyword_report": "keyword_view",
}
API_VERSION = "v11"
logger = logging.getLogger("airbyte")


class GoogleAds:
Expand All @@ -43,13 +47,21 @@ def __init__(self, credentials: MutableMapping[str, Any]):
self.client = GoogleAdsClient.load_from_dict(credentials, version=API_VERSION)
self.ga_service = self.client.get_service("GoogleAdsService")

@backoff.on_exception(
backoff.expo,
(ServerError, TooManyRequests),
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
max_tries=5,
)
def send_request(self, query: str, customer_id: str) -> Iterator[SearchGoogleAdsResponse]:
client = self.client
search_request = client.get_type("SearchGoogleAdsRequest")
search_request.query = query
search_request.page_size = self.DEFAULT_PAGE_SIZE
search_request.customer_id = customer_id
yield self.ga_service.search(search_request)
return [self.ga_service.search(search_request)]

def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.ads.googleads.errors import GoogleAdsException
from google.ads.googleads.v11.errors.types.errors import ErrorCode, GoogleAdsError, GoogleAdsFailure
from google.ads.googleads.v11.errors.types.request_error import RequestErrorEnum
from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests
from grpc import RpcError
from source_google_ads.google_ads import GoogleAds
from source_google_ads.streams import ClickView
Expand Down Expand Up @@ -193,3 +194,27 @@ def test_page_token_expired_it_should_fail_date_range_1_day(mock_ads_client, con

stream.get_query.assert_called_with({"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"})
assert stream.get_query.call_count == 1


@pytest.mark.parametrize("error_cls", (ResourceExhausted, TooManyRequests, InternalServerError, DataLoss))
def test_retry_transient_errors(mocker, config, customers, error_cls):
mocker.patch("time.sleep")
credentials = config["credentials"]
credentials.update(use_proto_plus=True)
api = GoogleAds(credentials=credentials)
mocked_search = mocker.patch.object(api.ga_service, "search", side_effect=error_cls("Error message"))
incremental_stream_config = dict(
api=api,
conversion_window_days=config["conversion_window_days"],
start_date=config["start_date"],
end_date="2021-04-04",
customers=customers,
)
stream = ClickView(**incremental_stream_config)
customer_id = next(iter(customers)).id
stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"}
records = []
with pytest.raises(error_cls):
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice))
assert mocked_search.call_count == 5
assert records == []
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors |
| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream |
| `0.2.4` | 2022-11-09 | [19208](https://github.com/airbytehq/airbyte/pull/19208) | Add TypeTransofrmer to Campaings stream to force proper type casting |
| `0.2.3` | 2022-10-17 | [18069](https://github.com/airbytehq/airbyte/pull/18069) | Add `segments.hour`, `metrics.ctr`, `metrics.conversions` and `metrics.conversions_values` fields to `campaigns` report stream |
Expand Down

0 comments on commit 62d219f

Please sign in to comment.