Skip to content

Commit

Permalink
🎉 Source Facebook Marketing: improve sleeps time in rate limit handler (
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-remar authored Mar 25, 2022
1 parent c262d20 commit c51ea6e
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.39
dockerImageTag: 0.2.40
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
24 changes: 12 additions & 12 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1684,20 +1684,28 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.39"
- dockerImage: "airbyte/source-facebook-marketing:0.2.40"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
connectionSpecification:
title: "Source Facebook Marketing"
type: "object"
properties:
account_id:
title: "Account ID"
description: "The Facebook Ad account ID to use when pulling data from the\
\ Facebook Marketing API."
order: 0
examples:
- "111111111111111"
type: "string"
start_date:
title: "Start Date"
description: "The date from which you'd like to replicate data for all incremental\
\ streams, in the format YYYY-MM-DDT00:00:00Z. All data generated after\
\ this date will be replicated."
order: 0
order: 1
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
examples:
- "2017-01-25T00:00:00Z"
Expand All @@ -1709,20 +1717,12 @@
\ incremental streams, in the format YYYY-MM-DDT00:00:00Z. All data generated\
\ between start_date and this date will be replicated. Not setting this\
\ option will result in always syncing the latest data."
order: 1
order: 2
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
examples:
- "2017-01-26T00:00:00Z"
type: "string"
format: "date-time"
account_id:
title: "Account ID"
description: "The Facebook Ad account ID to use when pulling data from the\
\ Facebook Marketing API."
order: 2
examples:
- "111111111111111"
type: "string"
access_token:
title: "Access Token"
description: "The value of the access token generated. See the <a href=\"\
Expand Down Expand Up @@ -1990,8 +1990,8 @@
required:
- "name"
required:
- "start_date"
- "account_id"
- "start_date"
- "access_token"
supportsIncremental: true
supportsNormalization: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.39
LABEL io.airbyte.version=0.2.40
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class FacebookAPIException(Exception):
class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""

call_rate_threshold = 95 # maximum percentage of call limit utilization
pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit
MAX_RATE, MAX_PAUSE_INTERVAL = (95, pendulum.duration(minutes=5))
MIN_RATE, MIN_PAUSE_INTERVAL = (90, pendulum.duration(minutes=1))

@dataclass
class Throttle:
Expand Down Expand Up @@ -87,34 +87,41 @@ def _parse_call_rate_header(headers):

return usage, pause_interval

def handle_call_rate_limit(self, response, params):
def _compute_pause_interval(self, usage, pause_interval):
"""The sleep time will be calculated based on usage consumed."""
if usage >= self.MAX_RATE:
return max(self.MAX_PAUSE_INTERVAL, pause_interval)
return max(self.MIN_PAUSE_INTERVAL, pause_interval)

def _get_max_usage_pause_interval_from_batch(self, records):
usage = 0
pause_interval = self.MIN_PAUSE_INTERVAL

for record in records:
# there are two types of failures:
# 1. no response (we execute batch until all inner requests has response)
# 2. response with error (we crash loudly)
# in case it is failed inner request the headers might not be present
if "headers" not in record:
continue
headers = {header["name"].lower(): header["value"] for header in record["headers"]}
usage_from_response, pause_interval_from_response = self._parse_call_rate_header(headers)
usage = max(usage, usage_from_response)
pause_interval = max(pause_interval_from_response, pause_interval)
return usage, pause_interval

def _handle_call_rate_limit(self, response, params):
if "batch" in params:
max_usage = 0
max_pause_interval = self.pause_interval_minimum

for record in response.json():
# there are two types of failures:
# 1. no response (we execute batch until all inner requests has response)
# 2. response with error (we crash loudly)
# in case it is failed inner request the headers might not be present
if "headers" not in record:
continue
headers = {header["name"].lower(): header["value"] for header in record["headers"]}
usage, pause_interval = self._parse_call_rate_header(headers)
max_usage = max(max_usage, usage)
max_pause_interval = max(max_pause_interval, pause_interval)

if max_usage > self.call_rate_threshold:
max_pause_interval = max(max_pause_interval, self.pause_interval_minimum)
logger.warning(f"Utilization is too high ({max_usage})%, pausing for {max_pause_interval}")
sleep(max_pause_interval.total_seconds())
records = response.json()
usage, pause_interval = self._get_max_usage_pause_interval_from_batch(records)
else:
headers = response.headers()
usage, pause_interval = self._parse_call_rate_header(headers)
if usage > self.call_rate_threshold or pause_interval:
pause_interval = max(pause_interval, self.pause_interval_minimum)
logger.warning(f"Utilization is too high ({usage})%, pausing for {pause_interval}")
sleep(pause_interval.total_seconds())

if usage >= self.MIN_RATE:
sleep_time = self._compute_pause_interval(usage=usage, pause_interval=pause_interval)
logger.warning(f"Utilization is too high ({usage})%, pausing for {sleep_time}")
sleep(sleep_time.total_seconds())

def _update_insights_throttle_limit(self, response: FacebookResponse):
"""
Expand Down Expand Up @@ -145,7 +152,7 @@ def call(
"""Makes an API call, delegate actual work to parent class and handles call rates"""
response = super().call(method, path, params, headers, files, url_override, api_version)
self._update_insights_throttle_limit(response)
self.handle_call_rate_limit(response, params)
self._handle_call_rate_limit(response, params)
return response


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import pendulum
import pytest
import source_facebook_marketing


class TestMyFacebookAdsApi:
@pytest.fixture
def api(self):
return source_facebook_marketing.api.MyFacebookAdsApi.init(access_token="foo", crash_log=False)

@pytest.mark.parametrize(
"max_rate,max_pause_interval,min_pause_interval,usage,pause_interval,expected_pause_interval",
[
(
95,
pendulum.duration(minutes=5),
pendulum.duration(minutes=1),
96,
pendulum.duration(minutes=6),
pendulum.duration(minutes=6),
),
(
95,
pendulum.duration(minutes=5),
pendulum.duration(minutes=2),
96,
pendulum.duration(minutes=1),
pendulum.duration(minutes=5),
),
(
95,
pendulum.duration(minutes=5),
pendulum.duration(minutes=1),
93,
pendulum.duration(minutes=4),
pendulum.duration(minutes=4),
),
],
)
def test__compute_pause_interval(
self, mocker, api, max_rate, max_pause_interval, min_pause_interval, usage, pause_interval, expected_pause_interval
):
mocker.patch.object(api, "MAX_RATE", max_rate)
mocker.patch.object(api, "MAX_PAUSE_INTERVAL", max_pause_interval)
mocker.patch.object(api, "MIN_PAUSE_INTERVAL", min_pause_interval)
computed_pause_interval = api._compute_pause_interval(usage, pause_interval)
assert computed_pause_interval == expected_pause_interval

@pytest.mark.parametrize(
"min_pause_interval,usages_pause_intervals,expected_output",
[
(
pendulum.duration(minutes=1), # min_pause_interval
[(5, pendulum.duration(minutes=6)), (7, pendulum.duration(minutes=5))], # usages_pause_intervals
(7, pendulum.duration(minutes=6)), # expected_output
),
(
pendulum.duration(minutes=10), # min_pause_interval
[(5, pendulum.duration(minutes=6)), (7, pendulum.duration(minutes=5))], # usages_pause_intervals
(7, pendulum.duration(minutes=10)), # expected_output
),
(
pendulum.duration(minutes=10), # min_pause_interval
[ # usages_pause_intervals
(9, pendulum.duration(minutes=6)),
],
(9, pendulum.duration(minutes=10)), # expected_output
),
(
pendulum.duration(minutes=10), # min_pause_interval
[ # usages_pause_intervals
(-1, pendulum.duration(minutes=1)),
(-2, pendulum.duration(minutes=10)),
(-3, pendulum.duration(minutes=100)),
],
(0, pendulum.duration(minutes=100)), # expected_output
),
],
)
def test__get_max_usage_pause_interval_from_batch(self, mocker, api, min_pause_interval, usages_pause_intervals, expected_output):
records = [
{"headers": [{"name": "USAGE", "value": usage}, {"name": "PAUSE_INTERVAL", "value": pause_interval}]}
for usage, pause_interval in usages_pause_intervals
]

mock_parse_call_rate_header = mocker.Mock(side_effect=usages_pause_intervals)
mocker.patch.object(api, "_parse_call_rate_header", mock_parse_call_rate_header)
mocker.patch.object(api, "MIN_PAUSE_INTERVAL", min_pause_interval)

output = api._get_max_usage_pause_interval_from_batch(records)
api._parse_call_rate_header.assert_called_with(
{"usage": usages_pause_intervals[-1][0], "pause_interval": usages_pause_intervals[-1][1]}
)
assert output == expected_output

@pytest.mark.parametrize(
"params,min_rate,usage,expect_sleep",
[
(["batch"], 0, 1, True),
(["batch"], 0, 0, True),
(["batch"], 2, 1, False),
(["not_batch"], 0, 1, True),
(["not_batch"], 0, 0, True),
(["not_batch"], 2, 1, False),
],
)
def test__handle_call_rate_limit(self, mocker, api, params, min_rate, usage, expect_sleep):
pause_interval = 1
mock_response = mocker.Mock()

mocker.patch.object(api, "MIN_RATE", min_rate)
mocker.patch.object(api, "_get_max_usage_pause_interval_from_batch", mocker.Mock(return_value=(usage, pause_interval)))
mocker.patch.object(api, "_parse_call_rate_header", mocker.Mock(return_value=(usage, pause_interval)))
mocker.patch.object(api, "_compute_pause_interval")
mocker.patch.object(source_facebook_marketing.api, "logger")
mocker.patch.object(source_facebook_marketing.api, "sleep")
assert api._handle_call_rate_limit(mock_response, params) is None
if "batch" in params:
api._get_max_usage_pause_interval_from_batch.assert_called_with(mock_response.json.return_value)
else:
api._parse_call_rate_header.assert_called_with(mock_response.headers.return_value)
if expect_sleep:
api._compute_pause_interval.assert_called_with(usage=usage, pause_interval=pause_interval)
source_facebook_marketing.api.sleep.assert_called_with(api._compute_pause_interval.return_value.total_seconds())
source_facebook_marketing.api.logger.warning.assert_called_with(
f"Utilization is too high ({usage})%, pausing for {api._compute_pause_interval.return_value}"
)
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.40 | 2022-02-28 | [10698](https://github.com/airbytehq/airbyte/pull/10698) | Improve sleeps time in rate limit handler |
| 0.2.39 | 2022-03-09 | [10917](https://github.com/airbytehq/airbyte/pull/10917) | Retry connections when FB API returns error code 2 (temporary oauth error) |
| 0.2.38 | 2022-03-08 | [10531](https://github.com/airbytehq/airbyte/pull/10531) | Add `time_increment` parameter to custom insights |
| 0.2.37 | 2022-02-28 | [10655](https://github.com/airbytehq/airbyte/pull/10655) | Add Activities stream |
Expand Down

0 comments on commit c51ea6e

Please sign in to comment.