From 6cd20e6879a42053692c18a16906eb410aad85db Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Thu, 24 Feb 2022 01:22:53 +0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20FB=20Marketing:=20fix?= =?UTF-8?q?=20`execute=5Fin=5Fbatch`=20when=20batch=20is=20bigger=20than?= =?UTF-8?q?=2050=20(#10588)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix execute_in_batch * add tests * fix pre-commit config Co-authored-by: Sherif A. Nada Co-authored-by: Eugene Kulak Co-authored-by: Sherif A. Nada --- .pre-commit-config.yaml | 6 +- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 289 +++++++++++++----- .../source-facebook-marketing/Dockerfile | 2 +- .../source_facebook_marketing/api.py | 6 + .../streams/base_streams.py | 6 +- .../unit_tests/test_base_streams.py | 130 ++++++++ .../sources/facebook-marketing.md | 1 + 8 files changed, 356 insertions(+), 86 deletions(-) create mode 100644 airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a9781b80ae41..671541f18bdb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,7 @@ repos: - id: black args: ["--config", "pyproject.toml"] - repo: https://github.com/timothycrosley/isort - rev: 5.6.4 + rev: 5.10.1 hooks: - id: isort args: @@ -38,14 +38,14 @@ repos: ).?$ - repo: https://github.com/csachs/pyproject-flake8 - rev: 0.0.1a2 + rev: v0.0.1a2.post1 hooks: - id: pyproject-flake8 args: ["--config", "pyproject.toml"] additional_dependencies: ["mccabe"] alias: flake8 - repo: https://github.com/pre-commit/mirrors-mypy - rev: 0.930 + rev: v0.930 hooks: - id: mypy args: ["--config-file", "pyproject.toml"] diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d91a8d423e9d..aff3a5e9b12f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -203,7 +203,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.35 + dockerImageTag: 0.2.36 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index aff539123401..1931a570dfe4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1648,7 +1648,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.35" +- dockerImage: "airbyte/source-facebook-marketing:0.2.36" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" @@ -1656,23 +1656,12 @@ title: "Source Facebook Marketing" type: "object" properties: - account_id: - title: "Account Id" - description: "The Facebook Ad account ID to use when pulling data from the\ - \ Facebook Marketing API." - type: "string" - access_token: - title: "Access Token" - description: "The value of the access token generated. See the docs\ - \ for more information" - airbyte_secret: true - type: "string" start_date: title: "Start Date" - description: "The date from which you'd like to replicate data for AdCreatives\ - \ and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated\ - \ after this date will be replicated." + description: "The date from which you'd like to replicate data for all incremental\ + \ streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after\ + \ this date will be replicated." + order: 0 pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" examples: - "2017-01-25T00:00:00Z" @@ -1680,48 +1669,54 @@ format: "date-time" end_date: title: "End Date" - description: "The date until which you'd like to replicate data for AdCreatives\ - \ and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated\ + description: "The date until which you'd like to replicate data for all\ + \ incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated\ \ between start_date and this date will be replicated. Not setting this\ \ option will result in always syncing the latest data." + order: 1 pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" examples: - "2017-01-26T00:00:00Z" type: "string" format: "date-time" + account_id: + title: "Account ID" + description: "The Facebook Ad account ID to use when pulling data from the\ + \ Facebook Marketing API." + order: 2 + examples: + - "111111111111111" + type: "string" + access_token: + title: "Access Token" + description: "The value of the access token generated. See the docs\ + \ for more information" + order: 3 + airbyte_secret: true + type: "string" + include_deleted: + title: "Include Deleted" + description: "Include data from deleted Campaigns, Ads, and AdSets" + default: false + order: 4 + type: "boolean" fetch_thumbnail_images: title: "Fetch Thumbnail Images" description: "In each Ad Creative, fetch the thumbnail_url and store the\ \ result in thumbnail_data_url" default: false + order: 5 type: "boolean" - include_deleted: - title: "Include Deleted" - description: "Include data from deleted campaigns, ads, and adsets" - default: false - type: "boolean" - insights_lookback_window: - title: "Insights Lookback Window" - description: "The attribution window for the actions" - default: 28 - minimum: 0 - maximum: 28 - type: "integer" - insights_days_per_job: - title: "Insights Days Per Job" - description: "Number of days to sync in one job (the more data you have,\ - \ the smaller this parameter should be)" - default: 7 - minimum: 1 - maximum: 30 - type: "integer" custom_insights: title: "Custom Insights" - description: "A list wich contains insights entries, each entry must have\ + description: "A list which contains insights entries, each entry must have\ \ a name and can contains fields, breakdowns or action_breakdowns)" + order: 6 type: "array" items: title: "InsightConfig" + description: "Config for custom insights" type: "object" properties: name: @@ -1729,64 +1724,202 @@ description: "The name value of insight" type: "string" fields: - title: "Fields" description: "A list of chosen fields for fields parameter" default: [] type: "array" items: - type: "string" + title: "ValidEnums" + description: "Generic enumeration.\n\nDerive from this class to\ + \ define new enumerations." + enum: + - "account_currency" + - "account_id" + - "account_name" + - "action_values" + - "actions" + - "ad_bid_value" + - "ad_click_actions" + - "ad_id" + - "ad_impression_actions" + - "ad_name" + - "adset_bid_value" + - "adset_end" + - "adset_id" + - "adset_name" + - "adset_start" + - "age_targeting" + - "attribution_setting" + - "auction_bid" + - "auction_competitiveness" + - "auction_max_competitor_bid" + - "buying_type" + - "campaign_id" + - "campaign_name" + - "canvas_avg_view_percent" + - "canvas_avg_view_time" + - "catalog_segment_actions" + - "catalog_segment_value" + - "catalog_segment_value_mobile_purchase_roas" + - "catalog_segment_value_omni_purchase_roas" + - "catalog_segment_value_website_purchase_roas" + - "clicks" + - "conversion_rate_ranking" + - "conversion_values" + - "conversions" + - "converted_product_quantity" + - "converted_product_value" + - "cost_per_15_sec_video_view" + - "cost_per_2_sec_continuous_video_view" + - "cost_per_action_type" + - "cost_per_ad_click" + - "cost_per_conversion" + - "cost_per_dda_countby_convs" + - "cost_per_estimated_ad_recallers" + - "cost_per_inline_link_click" + - "cost_per_inline_post_engagement" + - "cost_per_one_thousand_ad_impression" + - "cost_per_outbound_click" + - "cost_per_thruplay" + - "cost_per_unique_action_type" + - "cost_per_unique_click" + - "cost_per_unique_conversion" + - "cost_per_unique_inline_link_click" + - "cost_per_unique_outbound_click" + - "cpc" + - "cpm" + - "cpp" + - "created_time" + - "ctr" + - "date_start" + - "date_stop" + - "dda_countby_convs" + - "dda_results" + - "engagement_rate_ranking" + - "estimated_ad_recall_rate" + - "estimated_ad_recall_rate_lower_bound" + - "estimated_ad_recall_rate_upper_bound" + - "estimated_ad_recallers" + - "estimated_ad_recallers_lower_bound" + - "estimated_ad_recallers_upper_bound" + - "frequency" + - "full_view_impressions" + - "full_view_reach" + - "gender_targeting" + - "impressions" + - "inline_link_click_ctr" + - "inline_link_clicks" + - "inline_post_engagement" + - "instant_experience_clicks_to_open" + - "instant_experience_clicks_to_start" + - "instant_experience_outbound_clicks" + - "interactive_component_tap" + - "labels" + - "location" + - "mobile_app_purchase_roas" + - "objective" + - "optimization_goal" + - "outbound_clicks" + - "outbound_clicks_ctr" + - "place_page_name" + - "purchase_roas" + - "qualifying_question_qualify_answer_rate" + - "quality_ranking" + - "quality_score_ectr" + - "quality_score_ecvr" + - "quality_score_organic" + - "reach" + - "social_spend" + - "spend" + - "total_postbacks" + - "unique_actions" + - "unique_clicks" + - "unique_conversions" + - "unique_ctr" + - "unique_inline_link_click_ctr" + - "unique_inline_link_clicks" + - "unique_link_clicks_ctr" + - "unique_outbound_clicks" + - "unique_outbound_clicks_ctr" + - "unique_video_continuous_2_sec_watched_actions" + - "unique_video_view_15_sec" + - "updated_time" + - "video_15_sec_watched_actions" + - "video_30_sec_watched_actions" + - "video_avg_time_watched_actions" + - "video_continuous_2_sec_watched_actions" + - "video_p100_watched_actions" + - "video_p25_watched_actions" + - "video_p50_watched_actions" + - "video_p75_watched_actions" + - "video_p95_watched_actions" + - "video_play_actions" + - "video_play_curve_actions" + - "video_play_retention_0_to_15s_actions" + - "video_play_retention_20_to_60s_actions" + - "video_play_retention_graph_actions" + - "video_thruplay_watched_actions" + - "video_time_watched_actions" + - "website_ctr" + - "website_purchase_roas" + - "wish_bid" breakdowns: - title: "Breakdowns" description: "A list of chosen breakdowns for breakdowns" default: [] type: "array" items: - type: "string" + title: "ValidBreakdowns" + description: "Generic enumeration.\n\nDerive from this class to\ + \ define new enumerations." + enum: + - "ad_format_asset" + - "age" + - "app_id" + - "body_asset" + - "call_to_action_asset" + - "country" + - "description_asset" + - "device_platform" + - "dma" + - "frequency_value" + - "gender" + - "hourly_stats_aggregated_by_advertiser_time_zone" + - "hourly_stats_aggregated_by_audience_time_zone" + - "image_asset" + - "impression_device" + - "link_url_asset" + - "place_page_id" + - "platform_position" + - "product_id" + - "publisher_platform" + - "region" + - "skan_conversion_id" + - "title_asset" + - "video_asset" action_breakdowns: - title: "Action Breakdowns" description: "A list of chosen action_breakdowns for action_breakdowns" default: [] type: "array" items: - type: "string" + title: "ValidActionBreakdowns" + description: "Generic enumeration.\n\nDerive from this class to\ + \ define new enumerations." + enum: + - "action_canvas_component_name" + - "action_carousel_card_id" + - "action_carousel_card_name" + - "action_destination" + - "action_device" + - "action_reaction" + - "action_target_id" + - "action_type" + - "action_video_sound" + - "action_video_type" required: - "name" required: + - "start_date" - "account_id" - "access_token" - - "start_date" - definitions: - InsightConfig: - title: "InsightConfig" - type: "object" - properties: - name: - title: "Name" - description: "The name value of insight" - type: "string" - fields: - title: "Fields" - description: "A list of chosen fields for fields parameter" - default: [] - type: "array" - items: - type: "string" - breakdowns: - title: "Breakdowns" - description: "A list of chosen breakdowns for breakdowns" - default: [] - type: "array" - items: - type: "string" - action_breakdowns: - title: "Action Breakdowns" - description: "A list of chosen action_breakdowns for action_breakdowns" - default: [] - type: "array" - items: - type: "string" - required: - - "name" supportsIncremental: true supportsNormalization: false supportsDBT: false diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 02baf0fe1268..2c76ee1b5cd9 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.35 +LABEL io.airbyte.version=0.2.36 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py index 8dcb33356bdb..57c638e8bb68 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py @@ -93,6 +93,12 @@ def handle_call_rate_limit(self, response, params): max_pause_interval = self.pause_interval_minimum for record in response.json(): + # there are two types of failures: + # 1. no response (we execute batch until all inner requests has response) + # 2. response with error (we crash loudly) + # in case it is failed inner request the headers might not be present + if "headers" not in record: + continue headers = {header["name"].lower(): header["value"] for header in record["headers"]} usage, pause_interval = self._parse_call_rate_header(headers) max_usage = max(max_usage, usage) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index 65992a9ab865..48cf505c310e 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -49,13 +49,12 @@ def fields(self) -> List[str]: """List of fields that we want to query, for now just all properties from stream's schema""" return list(self.get_json_schema().get("properties", {}).keys()) - def _execute_batch(self, batch: FacebookAdsApiBatch) -> FacebookAdsApiBatch: + def _execute_batch(self, batch: FacebookAdsApiBatch) -> None: """Execute batch, retry in case of failures""" while batch: batch = batch.execute() if batch: logger.info("Retry failed requests in batch") - return batch def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]: """Execute list of requests in batches""" @@ -71,9 +70,10 @@ def failure(response: FacebookResponse): for request in pending_requests: api_batch.add_request(request, success=success, failure=failure) if len(api_batch) == MAX_BATCH_SIZE: - api_batch = self._execute_batch(api_batch) + self._execute_batch(api_batch) yield from records records = [] + api_batch: FacebookAdsApiBatch = self._api.api.new_batch() self._execute_batch(api_batch) yield from records diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py new file mode 100644 index 000000000000..4cc4024816fc --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json +from functools import partial +from typing import Any, Iterable, Mapping + +import pytest +from facebook_business import FacebookSession +from facebook_business.api import FacebookAdsApi, FacebookAdsApiBatch, FacebookRequest +from source_facebook_marketing.api import MyFacebookAdsApi +from source_facebook_marketing.streams.base_streams import FBMarketingStream +from source_facebook_marketing.streams.common import MAX_BATCH_SIZE + + +@pytest.fixture(name="mock_batch_responses") +def mock_batch_responses_fixture(requests_mock): + return partial(requests_mock.register_uri, "POST", f"{FacebookSession.GRAPH}/{FacebookAdsApi.API_VERSION}/") + + +@pytest.fixture(name="batch") +def batch_fixture(api, mocker): + batch = FacebookAdsApiBatch(api=api.api) + mocker.patch.object(batch, "execute", wraps=batch.execute) + mocker.patch.object(batch, "add_request", wraps=batch.add_request) + mocker.patch.object(MyFacebookAdsApi, "new_batch", return_value=batch) + return batch + + +class SomeTestStream(FBMarketingStream): + def list_objects(self, params: Mapping[str, Any]) -> Iterable: + yield from [] + + +class TestBaseStream: + def test_execute_in_batch_with_few_requests(self, api, batch, mock_batch_responses): + """Should execute single batch if number of requests less than MAX_BATCH_SIZE.""" + mock_batch_responses( + [ + { + "json": [{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}] * 3, + } + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(5)] + + result = list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == len(requests) + batch.execute.assert_called_once() + assert len(result) == 3 + + def test_execute_in_batch_with_many_requests(self, api, batch, mock_batch_responses): + """Should execute as many batches as needed if number of requests bigger than MAX_BATCH_SIZE.""" + mock_batch_responses( + [ + { + "json": [{"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}] * 5, + } + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(MAX_BATCH_SIZE + 1)] + + result = list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == len(requests) + assert batch.execute.call_count == 2 + assert len(result) == 5 * 2 + + def test_execute_in_batch_with_retries(self, api, batch, mock_batch_responses): + """Should retry batch execution until succeed""" + # batch.execute.side_effect = [batch, batch, None] + mock_batch_responses( + [ + { + "json": [ + {}, + {}, + {"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}, + ], + }, + { + "json": [ + {}, + {"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}, + ], + }, + { + "json": [ + {"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}, + ], + }, + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(3)] + + result = list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == len(requests) + assert batch.execute.call_count == 1 + assert len(result) == 3 + + def test_execute_in_batch_with_fails(self, api, batch, mock_batch_responses): + """Should fail with exception when any request returns error""" + mock_batch_responses( + [ + { + "json": [ + {"body": "{}", "code": 500, "headers": {}}, + {"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}, + ], + } + ] + ) + + stream = SomeTestStream(api=api) + requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(5)] + + with pytest.raises(RuntimeError, match="Batch request failed with response:"): + list(stream.execute_in_batch(requests)) + + assert batch.add_request.call_count == len(requests) + assert batch.execute.call_count == 1 diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 13f6a2376337..7ddedefb4558 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -103,6 +103,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.2.36 | 2022-02-24 | [10588](https://github.com/airbytehq/airbyte/pull/10588) | Fix `execute_in_batch` for large amount of requests | | 0.2.35 | 2022-02-18 | [10348](https://github.com/airbytehq/airbyte/pull/10348) | Add 104 error code to backoff triggers | | 0.2.34 | 2022-02-17 | [10180](https://github.com/airbytehq/airbyte/pull/9805) | Performance and reliability fixes | | 0.2.33 | 2021-12-28 | [10180](https://github.com/airbytehq/airbyte/pull/10180) | Add AdAccount and Images streams |