diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json index e1bd6fe1224f..622000438cab 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c", "name": "Facebook Marketing", "dockerRepository": "airbyte/source-facebook-marketing", - "dockerImageTag": "0.2.13", + "dockerImageTag": "0.2.14", "documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing", "icon": "facebook.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9800b15052dd..90f7a29a2eda 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -121,7 +121,7 @@ - sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c name: Facebook Marketing dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.13 + dockerImageTag: 0.2.14 documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing icon: facebook.svg - sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 6f4531fc2553..bd780d432974 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.13 +LABEL io.airbyte.version=0.2.14 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py index de8a83b821e6..a2319bddf20e 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py @@ -39,40 +39,58 @@ class MyFacebookAdsApi(FacebookAdsApi): """Custom Facebook API class to intercept all API calls and handle call rate limits""" call_rate_threshold = 90 # maximum percentage of call limit utilization - pause_interval = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit + pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit + @staticmethod def parse_call_rate_header(headers): - call_count = 0 + usage = 0 pause_interval = pendulum.duration() - usage_header = headers.get("x-business-use-case-usage") or headers.get("x-app-usage") or headers.get("x-ad-account-usage") - if usage_header: - usage_header = json.loads(usage_header) - call_count = usage_header.get("call_count") or usage_header.get("acc_id_util_pct") or 0 - pause_interval = pendulum.duration(minutes=usage_header.get("estimated_time_to_regain_access", 0)) + usage_header_business = headers.get("x-business-use-case-usage") + usage_header_app = headers.get("x-app-usage") + usage_header_ad_account = headers.get("x-ad-account-usage") + + if usage_header_ad_account: + usage_header_ad_account_loaded = json.loads(usage_header_ad_account) + usage = max(usage, usage_header_ad_account_loaded.get("acc_id_util_pct") ) + + if usage_header_app: + usage_header_app_loaded = json.loads() + usage = max(usage, usage_header_app_loaded.get("call_count"), usage_header_app_loaded.get("total_time"), usage_header_app_loaded.get("total_cputime") ) - return call_count, pause_interval + if usage_header_business: + + usage_header_business_loaded = json.loads(usage_header_business) + for business_object_id in usage_header_business_loaded: + usage_limits = usage_header_business_loaded.get(business_object_id)[0] + usage = max(usage, usage_limits.get('call_count'), usage_limits.get('total_cputime'), usage_limits.get('total_time')) + pause_interval = max(pause_interval, pendulum.duration(minutes=usage_limits.get("estimated_time_to_regain_access", 0))) + + return usage, pause_interval def handle_call_rate_limit(self, response, params): if "batch" in params: - max_call_count = 0 - max_pause_interval = self.pause_interval + max_usage = 0 + max_pause_interval = self.pause_interval_minimum for record in response.json(): headers = {header["name"].lower(): header["value"] for header in record["headers"]} - call_count, pause_interval = self.parse_call_rate_header(headers) - max_call_count = max(max_call_count, call_count) + usage, pause_interval = self.parse_call_rate_header(headers) + max_usage = max(max_usage, usage) max_pause_interval = max(max_pause_interval, pause_interval) - if max_call_count > self.call_rate_threshold: - logger.warn(f"Utilization is too high ({max_call_count})%, pausing for {max_pause_interval}") + + if max_usage > self.call_rate_threshold: + max_pause_interval = max(max_pause_interval, self.pause_interval_minimum) + logger.warn(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}") sleep(max_pause_interval.total_seconds()) else: headers = response.headers() - call_count, pause_interval = self.parse_call_rate_header(headers) - if call_count > self.call_rate_threshold or pause_interval: - logger.warn(f"Utilization is too high ({call_count})%, pausing for {pause_interval}") + usage, pause_interval = self.parse_call_rate_header(headers) + if usage > self.call_rate_threshold or pause_interval: + pause_interval = max(pause_interval, self.pause_interval_minimum) + logger.warn(f"Utilization is too high ({usage})%, pausing for {pause_interval}") sleep(pause_interval.total_seconds()) def call( diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index bde46916ae05..b7acd90d17d1 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management| | 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:
- Improve error handling.
- Improve async job performance (insights).
- Add new configuration parameter `insights_days_per_job`.
- Rename stream `adsets` to `ad_sets`.
- Refactor schema logic for insights, allowing to configure any possible insight stream.| | 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0| | 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |