diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 422340d56c20..e50f5f1c44f5 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -210,7 +210,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.39 + dockerImageTag: 0.2.40 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 5a333ec0cc60..d11a0c91a416 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1684,7 +1684,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.39" +- dockerImage: "airbyte/source-facebook-marketing:0.2.40" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" @@ -1692,12 +1692,20 @@ title: "Source Facebook Marketing" type: "object" properties: + account_id: + title: "Account ID" + description: "The Facebook Ad account ID to use when pulling data from the\ + \ Facebook Marketing API." + order: 0 + examples: + - "111111111111111" + type: "string" start_date: title: "Start Date" description: "The date from which you'd like to replicate data for all incremental\ \ streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after\ \ this date will be replicated." - order: 0 + order: 1 pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" examples: - "2017-01-25T00:00:00Z" @@ -1709,20 +1717,12 @@ \ incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated\ \ between start_date and this date will be replicated. Not setting this\ \ option will result in always syncing the latest data." - order: 1 + order: 2 pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" examples: - "2017-01-26T00:00:00Z" type: "string" format: "date-time" - account_id: - title: "Account ID" - description: "The Facebook Ad account ID to use when pulling data from the\ - \ Facebook Marketing API." - order: 2 - examples: - - "111111111111111" - type: "string" access_token: title: "Access Token" description: "The value of the access token generated. See the = self.MAX_RATE: + return max(self.MAX_PAUSE_INTERVAL, pause_interval) + return max(self.MIN_PAUSE_INTERVAL, pause_interval) + + def _get_max_usage_pause_interval_from_batch(self, records): + usage = 0 + pause_interval = self.MIN_PAUSE_INTERVAL + + for record in records: + # there are two types of failures: + # 1. no response (we execute batch until all inner requests has response) + # 2. response with error (we crash loudly) + # in case it is failed inner request the headers might not be present + if "headers" not in record: + continue + headers = {header["name"].lower(): header["value"] for header in record["headers"]} + usage_from_response, pause_interval_from_response = self._parse_call_rate_header(headers) + usage = max(usage, usage_from_response) + pause_interval = max(pause_interval_from_response, pause_interval) + return usage, pause_interval + + def _handle_call_rate_limit(self, response, params): if "batch" in params: - max_usage = 0 - max_pause_interval = self.pause_interval_minimum - - for record in response.json(): - # there are two types of failures: - # 1. no response (we execute batch until all inner requests has response) - # 2. response with error (we crash loudly) - # in case it is failed inner request the headers might not be present - if "headers" not in record: - continue - headers = {header["name"].lower(): header["value"] for header in record["headers"]} - usage, pause_interval = self._parse_call_rate_header(headers) - max_usage = max(max_usage, usage) - max_pause_interval = max(max_pause_interval, pause_interval) - - if max_usage > self.call_rate_threshold: - max_pause_interval = max(max_pause_interval, self.pause_interval_minimum) - logger.warning(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}") - sleep(max_pause_interval.total_seconds()) + records = response.json() + usage, pause_interval = self._get_max_usage_pause_interval_from_batch(records) else: headers = response.headers() usage, pause_interval = self._parse_call_rate_header(headers) - if usage > self.call_rate_threshold or pause_interval: - pause_interval = max(pause_interval, self.pause_interval_minimum) - logger.warning(f"Utilization is too high ({usage})%, pausing for {pause_interval}") - sleep(pause_interval.total_seconds()) + + if usage >= self.MIN_RATE: + sleep_time = self._compute_pause_interval(usage=usage, pause_interval=pause_interval) + logger.warning(f"Utilization is too high ({usage})%, pausing for {sleep_time}") + sleep(sleep_time.total_seconds()) def _update_insights_throttle_limit(self, response: FacebookResponse): """ @@ -145,7 +152,7 @@ def call( """Makes an API call, delegate actual work to parent class and handles call rates""" response = super().call(method, path, params, headers, files, url_override, api_version) self._update_insights_throttle_limit(response) - self.handle_call_rate_limit(response, params) + self._handle_call_rate_limit(response, params) return response diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_api.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_api.py new file mode 100644 index 000000000000..ce8aa559067b --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_api.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# +import pendulum +import pytest +import source_facebook_marketing + + +class TestMyFacebookAdsApi: + @pytest.fixture + def api(self): + return source_facebook_marketing.api.MyFacebookAdsApi.init(access_token="foo", crash_log=False) + + @pytest.mark.parametrize( + "max_rate,max_pause_interval,min_pause_interval,usage,pause_interval,expected_pause_interval", + [ + ( + 95, + pendulum.duration(minutes=5), + pendulum.duration(minutes=1), + 96, + pendulum.duration(minutes=6), + pendulum.duration(minutes=6), + ), + ( + 95, + pendulum.duration(minutes=5), + pendulum.duration(minutes=2), + 96, + pendulum.duration(minutes=1), + pendulum.duration(minutes=5), + ), + ( + 95, + pendulum.duration(minutes=5), + pendulum.duration(minutes=1), + 93, + pendulum.duration(minutes=4), + pendulum.duration(minutes=4), + ), + ], + ) + def test__compute_pause_interval( + self, mocker, api, max_rate, max_pause_interval, min_pause_interval, usage, pause_interval, expected_pause_interval + ): + mocker.patch.object(api, "MAX_RATE", max_rate) + mocker.patch.object(api, "MAX_PAUSE_INTERVAL", max_pause_interval) + mocker.patch.object(api, "MIN_PAUSE_INTERVAL", min_pause_interval) + computed_pause_interval = api._compute_pause_interval(usage, pause_interval) + assert computed_pause_interval == expected_pause_interval + + @pytest.mark.parametrize( + "min_pause_interval,usages_pause_intervals,expected_output", + [ + ( + pendulum.duration(minutes=1), # min_pause_interval + [(5, pendulum.duration(minutes=6)), (7, pendulum.duration(minutes=5))], # usages_pause_intervals + (7, pendulum.duration(minutes=6)), # expected_output + ), + ( + pendulum.duration(minutes=10), # min_pause_interval + [(5, pendulum.duration(minutes=6)), (7, pendulum.duration(minutes=5))], # usages_pause_intervals + (7, pendulum.duration(minutes=10)), # expected_output + ), + ( + pendulum.duration(minutes=10), # min_pause_interval + [ # usages_pause_intervals + (9, pendulum.duration(minutes=6)), + ], + (9, pendulum.duration(minutes=10)), # expected_output + ), + ( + pendulum.duration(minutes=10), # min_pause_interval + [ # usages_pause_intervals + (-1, pendulum.duration(minutes=1)), + (-2, pendulum.duration(minutes=10)), + (-3, pendulum.duration(minutes=100)), + ], + (0, pendulum.duration(minutes=100)), # expected_output + ), + ], + ) + def test__get_max_usage_pause_interval_from_batch(self, mocker, api, min_pause_interval, usages_pause_intervals, expected_output): + records = [ + {"headers": [{"name": "USAGE", "value": usage}, {"name": "PAUSE_INTERVAL", "value": pause_interval}]} + for usage, pause_interval in usages_pause_intervals + ] + + mock_parse_call_rate_header = mocker.Mock(side_effect=usages_pause_intervals) + mocker.patch.object(api, "_parse_call_rate_header", mock_parse_call_rate_header) + mocker.patch.object(api, "MIN_PAUSE_INTERVAL", min_pause_interval) + + output = api._get_max_usage_pause_interval_from_batch(records) + api._parse_call_rate_header.assert_called_with( + {"usage": usages_pause_intervals[-1][0], "pause_interval": usages_pause_intervals[-1][1]} + ) + assert output == expected_output + + @pytest.mark.parametrize( + "params,min_rate,usage,expect_sleep", + [ + (["batch"], 0, 1, True), + (["batch"], 0, 0, True), + (["batch"], 2, 1, False), + (["not_batch"], 0, 1, True), + (["not_batch"], 0, 0, True), + (["not_batch"], 2, 1, False), + ], + ) + def test__handle_call_rate_limit(self, mocker, api, params, min_rate, usage, expect_sleep): + pause_interval = 1 + mock_response = mocker.Mock() + + mocker.patch.object(api, "MIN_RATE", min_rate) + mocker.patch.object(api, "_get_max_usage_pause_interval_from_batch", mocker.Mock(return_value=(usage, pause_interval))) + mocker.patch.object(api, "_parse_call_rate_header", mocker.Mock(return_value=(usage, pause_interval))) + mocker.patch.object(api, "_compute_pause_interval") + mocker.patch.object(source_facebook_marketing.api, "logger") + mocker.patch.object(source_facebook_marketing.api, "sleep") + assert api._handle_call_rate_limit(mock_response, params) is None + if "batch" in params: + api._get_max_usage_pause_interval_from_batch.assert_called_with(mock_response.json.return_value) + else: + api._parse_call_rate_header.assert_called_with(mock_response.headers.return_value) + if expect_sleep: + api._compute_pause_interval.assert_called_with(usage=usage, pause_interval=pause_interval) + source_facebook_marketing.api.sleep.assert_called_with(api._compute_pause_interval.return_value.total_seconds()) + source_facebook_marketing.api.logger.warning.assert_called_with( + f"Utilization is too high ({usage})%, pausing for {api._compute_pause_interval.return_value}" + ) diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index b4d0eb38ae59..d9bd0e08865b 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -104,6 +104,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.2.40 | 2022-02-28 | [10698](https://github.com/airbytehq/airbyte/pull/10698) | Improve sleeps time in rate limit handler | | 0.2.39 | 2022-03-09 | [10917](https://github.com/airbytehq/airbyte/pull/10917) | Retry connections when FB API returns error code 2 (temporary oauth error) | | 0.2.38 | 2022-03-08 | [10531](https://github.com/airbytehq/airbyte/pull/10531) | Add `time_increment` parameter to custom insights | | 0.2.37 | 2022-02-28 | [10655](https://github.com/airbytehq/airbyte/pull/10655) | Add Activities stream |