diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json
index e1bd6fe1224f..622000438cab 100644
--- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json
+++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json
@@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
- "dockerImageTag": "0.2.13",
+ "dockerImageTag": "0.2.14",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing",
"icon": "facebook.svg"
}
diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
index 9800b15052dd..90f7a29a2eda 100644
--- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
+++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml
@@ -121,7 +121,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
- dockerImageTag: 0.2.13
+ dockerImageTag: 0.2.14
documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing
icon: facebook.svg
- sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile
index 6f4531fc2553..bd780d432974 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile
+++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile
@@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
-LABEL io.airbyte.version=0.2.13
+LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.name=airbyte/source-facebook-marketing
diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py
index de8a83b821e6..a2319bddf20e 100644
--- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py
+++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py
@@ -39,40 +39,58 @@ class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""
call_rate_threshold = 90 # maximum percentage of call limit utilization
- pause_interval = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit
+ pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit
+
@staticmethod
def parse_call_rate_header(headers):
- call_count = 0
+ usage = 0
pause_interval = pendulum.duration()
- usage_header = headers.get("x-business-use-case-usage") or headers.get("x-app-usage") or headers.get("x-ad-account-usage")
- if usage_header:
- usage_header = json.loads(usage_header)
- call_count = usage_header.get("call_count") or usage_header.get("acc_id_util_pct") or 0
- pause_interval = pendulum.duration(minutes=usage_header.get("estimated_time_to_regain_access", 0))
+ usage_header_business = headers.get("x-business-use-case-usage")
+ usage_header_app = headers.get("x-app-usage")
+ usage_header_ad_account = headers.get("x-ad-account-usage")
+
+ if usage_header_ad_account:
+ usage_header_ad_account_loaded = json.loads(usage_header_ad_account)
+ usage = max(usage, usage_header_ad_account_loaded.get("acc_id_util_pct") )
+
+ if usage_header_app:
+ usage_header_app_loaded = json.loads()
+ usage = max(usage, usage_header_app_loaded.get("call_count"), usage_header_app_loaded.get("total_time"), usage_header_app_loaded.get("total_cputime") )
- return call_count, pause_interval
+ if usage_header_business:
+
+ usage_header_business_loaded = json.loads(usage_header_business)
+ for business_object_id in usage_header_business_loaded:
+ usage_limits = usage_header_business_loaded.get(business_object_id)[0]
+ usage = max(usage, usage_limits.get('call_count'), usage_limits.get('total_cputime'), usage_limits.get('total_time'))
+ pause_interval = max(pause_interval, pendulum.duration(minutes=usage_limits.get("estimated_time_to_regain_access", 0)))
+
+ return usage, pause_interval
def handle_call_rate_limit(self, response, params):
if "batch" in params:
- max_call_count = 0
- max_pause_interval = self.pause_interval
+ max_usage = 0
+ max_pause_interval = self.pause_interval_minimum
for record in response.json():
headers = {header["name"].lower(): header["value"] for header in record["headers"]}
- call_count, pause_interval = self.parse_call_rate_header(headers)
- max_call_count = max(max_call_count, call_count)
+ usage, pause_interval = self.parse_call_rate_header(headers)
+ max_usage = max(max_usage, usage)
max_pause_interval = max(max_pause_interval, pause_interval)
- if max_call_count > self.call_rate_threshold:
- logger.warn(f"Utilization is too high ({max_call_count})%, pausing for {max_pause_interval}")
+
+ if max_usage > self.call_rate_threshold:
+ max_pause_interval = max(max_pause_interval, self.pause_interval_minimum)
+ logger.warn(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}")
sleep(max_pause_interval.total_seconds())
else:
headers = response.headers()
- call_count, pause_interval = self.parse_call_rate_header(headers)
- if call_count > self.call_rate_threshold or pause_interval:
- logger.warn(f"Utilization is too high ({call_count})%, pausing for {pause_interval}")
+ usage, pause_interval = self.parse_call_rate_header(headers)
+ if usage > self.call_rate_threshold or pause_interval:
+ pause_interval = max(pause_interval, self.pause_interval_minimum)
+ logger.warn(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
sleep(pause_interval.total_seconds())
def call(
diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md
index bde46916ae05..b7acd90d17d1 100644
--- a/docs/integrations/sources/facebook-marketing.md
+++ b/docs/integrations/sources/facebook-marketing.md
@@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin
| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
+| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management|
| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:
- Improve error handling.
- Improve async job performance (insights).
- Add new configuration parameter `insights_days_per_job`.
- Rename stream `adsets` to `ad_sets`.
- Refactor schema logic for insights, allowing to configure any possible insight stream.|
| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0|
| 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |