Skip to content

Commit

Permalink
🐛 Source Facebook: Improve rate limit management (#4820)
Browse files Browse the repository at this point in the history
* Improve rate limit management

* bump version

* facebook-marketing.md update the changelog
  • Loading branch information
vladimir-remar authored and gl-pix committed Jul 22, 2021
1 parent 7a56b53 commit 2b077ec
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.13",
"dockerImageTag": "0.2.14",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing",
"icon": "facebook.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.13
dockerImageTag: 0.2.14
documentationUrl: https://hub.docker.com/r/airbyte/source-facebook-marketing
icon: facebook.svg
- sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,58 @@ class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""

call_rate_threshold = 90 # maximum percentage of call limit utilization
pause_interval = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit
pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit


@staticmethod
def parse_call_rate_header(headers):
call_count = 0
usage = 0
pause_interval = pendulum.duration()

usage_header = headers.get("x-business-use-case-usage") or headers.get("x-app-usage") or headers.get("x-ad-account-usage")
if usage_header:
usage_header = json.loads(usage_header)
call_count = usage_header.get("call_count") or usage_header.get("acc_id_util_pct") or 0
pause_interval = pendulum.duration(minutes=usage_header.get("estimated_time_to_regain_access", 0))
usage_header_business = headers.get("x-business-use-case-usage")
usage_header_app = headers.get("x-app-usage")
usage_header_ad_account = headers.get("x-ad-account-usage")

if usage_header_ad_account:
usage_header_ad_account_loaded = json.loads(usage_header_ad_account)
usage = max(usage, usage_header_ad_account_loaded.get("acc_id_util_pct") )

if usage_header_app:
usage_header_app_loaded = json.loads()
usage = max(usage, usage_header_app_loaded.get("call_count"), usage_header_app_loaded.get("total_time"), usage_header_app_loaded.get("total_cputime") )

return call_count, pause_interval
if usage_header_business:

usage_header_business_loaded = json.loads(usage_header_business)
for business_object_id in usage_header_business_loaded:
usage_limits = usage_header_business_loaded.get(business_object_id)[0]
usage = max(usage, usage_limits.get('call_count'), usage_limits.get('total_cputime'), usage_limits.get('total_time'))
pause_interval = max(pause_interval, pendulum.duration(minutes=usage_limits.get("estimated_time_to_regain_access", 0)))

return usage, pause_interval

def handle_call_rate_limit(self, response, params):
if "batch" in params:
max_call_count = 0
max_pause_interval = self.pause_interval
max_usage = 0
max_pause_interval = self.pause_interval_minimum

for record in response.json():
headers = {header["name"].lower(): header["value"] for header in record["headers"]}
call_count, pause_interval = self.parse_call_rate_header(headers)
max_call_count = max(max_call_count, call_count)
usage, pause_interval = self.parse_call_rate_header(headers)
max_usage = max(max_usage, usage)
max_pause_interval = max(max_pause_interval, pause_interval)

if max_call_count > self.call_rate_threshold:
logger.warn(f"Utilization is too high ({max_call_count})%, pausing for {max_pause_interval}")

if max_usage > self.call_rate_threshold:
max_pause_interval = max(max_pause_interval, self.pause_interval_minimum)
logger.warn(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}")
sleep(max_pause_interval.total_seconds())
else:
headers = response.headers()
call_count, pause_interval = self.parse_call_rate_header(headers)
if call_count > self.call_rate_threshold or pause_interval:
logger.warn(f"Utilization is too high ({call_count})%, pausing for {pause_interval}")
usage, pause_interval = self.parse_call_rate_header(headers)
if usage > self.call_rate_threshold or pause_interval:
pause_interval = max(pause_interval, self.pause_interval_minimum)
logger.warn(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
sleep(pause_interval.total_seconds())

def call(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management|
| 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK:<br>- Improve error handling.<br>- Improve async job performance (insights).<br>- Add new configuration parameter `insights_days_per_job`.<br>- Rename stream `adsets` to `ad_sets`.<br>- Refactor schema logic for insights, allowing to configure any possible insight stream.|
| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook_bussiness to 11.0|
| 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support |
Expand Down

0 comments on commit 2b077ec

Please sign in to comment.