Skip to content

Commit

Permalink
šŸ› Source Iterable: add retry for 500 - Generic Error, increase `reducā€¦
Browse files Browse the repository at this point in the history
ā€¦e slice max attempts` (airbytehq#23821)
  • Loading branch information
bazarnov authored and danielduckworth committed Mar 13, 2023
1 parent be83e26 commit 16c99de
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.24
dockerImageTag: 0.1.25
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
icon: iterable.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6983,7 +6983,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-iterable:0.1.24"
- dockerImage: "airbyte/source-iterable:0.1.25"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.name=airbyte/source-iterable
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-iterable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
pip install '.[tests]'
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Expand Down Expand Up @@ -50,7 +51,7 @@ and place them into `secrets/config.json`.
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Unit Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ class IterableStream(HttpStream, ABC):
# in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream
# to prevent 429 error on other streams
ignore_further_slices = False
# Hardcode the value because it is not returned from the API
BACKOFF_TIME_CONSTANT = 10.0
# define date-time fields with potential wrong format

url_base = "https://api.iterable.com/api/"
primary_key = "id"
Expand All @@ -42,6 +39,15 @@ def __init__(self, authenticator):
self._cred = authenticator
super().__init__(authenticator)

@property
def retry_factor(self) -> int:
return 20

# With factor 20 it would be from 20 to 400 seconds delay
@property
def max_retries(self) -> Union[int, None]:
return 10

@property
@abstractmethod
def data_field(self) -> str:
Expand All @@ -61,9 +67,6 @@ def check_unauthorized_key(self, response: requests.Response) -> bool:
return False
return True

def backoff_time(self, response: requests.Response) -> Optional[float]:
return self.BACKOFF_TIME_CONSTANT

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Iterable API does not support pagination
Expand All @@ -80,8 +83,16 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
yield record

def should_retry(self, response: requests.Response) -> bool:
# check the authentication
if not self.check_unauthorized_key(response):
return False
# retry on generic error 500 meaning
if response.status_code == 500:
if response.json().get("code") == "GenericError" and "Please try again later" in response.json().get("msg"):
self.logger.warn(f"Generic Server Error occured for stream: `{self.name}`.")
setattr(self, "raise_on_http_errors", False)
return True
# all other cases
return super().should_retry(response)

def read_records(
Expand Down Expand Up @@ -120,24 +131,6 @@ def __init__(self, start_date=None, end_date=None, **kwargs):
def path(self, **kwargs) -> str:
return "export/data.json"

def backoff_time(self, response: requests.Response) -> Optional[float]:
# Use default exponential backoff
return None

# For python backoff package expo backoff delays calculated according to formula:
# delay = factor * base ** n where base is 2
# With default factor equal to 5 and 5 retries delays would be 5, 10, 20, 40 and 80 seconds.
# For exports stream there is a limit of 4 requests per minute.
# Tune up factor and retries to send a lot of excessive requests before timeout exceed.
@property
def retry_factor(self) -> int:
return 20

# With factor 20 it woud be 20, 40, 80 and 160 seconds delays.
@property
def max_retries(self) -> Union[int, None]:
return 4

@staticmethod
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
if isinstance(value, int):
Expand Down Expand Up @@ -260,14 +253,14 @@ class IterableExportStreamAdjustableRange(IterableExportStream, ABC):
In case of slice processing request failed with ChunkedEncodingError (which
means that API server closed connection cause of request takes to much
time) make CHUNKED_ENCODING_ERROR_RETRIES (3) retries each time reducing
time) make CHUNKED_ENCODING_ERROR_RETRIES (6) retries each time reducing
slice length.
See AdjustableSliceGenerator description for more details on next slice length adjustment alghorithm.
"""

_adjustable_generator: AdjustableSliceGenerator = None
CHUNKED_ENCODING_ERROR_RETRIES = 3
CHUNKED_ENCODING_ERROR_RETRIES = 6

def stream_slices(
self,
Expand Down Expand Up @@ -329,6 +322,8 @@ class ListUsers(IterableStream):
primary_key = "listId"
data_field = "getUsers"
name = "list_users"
# enable caching, because this stream used by other ones
use_cache = True

def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str:
return f"lists/{self.data_field}?listId={stream_slice['list_id']}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,4 @@ def response_cb(req):
assert sum(ranges) == days_duration
assert len(ranges) == len(records)
# since read is called on source instance, under the hood .streams() is called which triggers one more http call
assert len(responses.calls) == 3 * len(ranges) + 1
assert len(responses.calls) == 3 * len(ranges)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


@responses.activate
@pytest.mark.parametrize("body, status, expected_streams", ((b"", 401, 7), (b"", 200, 44), (b"alpha@gmail.com\nbeta@gmail.com", 200, 44)))
@pytest.mark.parametrize("body, status, expected_streams", ((b"", 401, 44), (b"", 200, 44), (b"alpha@gmail.com\nbeta@gmail.com", 200, 44)))
def test_source_streams(mock_lists_resp, config, body, status, expected_streams):
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", body=body, status=status)
streams = SourceIterable().streams(config=config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,30 @@ def test_iterable_stream_parse_response():

def test_iterable_stream_backoff_time():
stream = Lists(authenticator=NoAuth())
assert stream.backoff_time(response=None) == stream.BACKOFF_TIME_CONSTANT
assert stream.backoff_time(response=None) is None


def test_iterable_export_stream_backoff_time():
stream = Users(authenticator=NoAuth(), start_date="2019-10-10T00:00:00")
assert stream.backoff_time(response=None) is None


@pytest.mark.parametrize(
"status, json, expected",
[
(429, {}, True),
(500, {"msg": "...Please try again later...", "code": "Generic Error"}, True)
],
)
def test_should_retry(status, json, expected, requests_mock):
stream = Lists(authenticator=NoAuth())
url = f"{stream.url_base}/{stream.path()}"
requests_mock.get(url, json=json, status_code=status)
test_response = requests.get(url)
result = stream.should_retry(test_response)
assert result is expected


@pytest.mark.parametrize(
"current_state,record_date,expected_state",
[
Expand All @@ -181,7 +197,7 @@ def test_get_updated_state(current_state, record_date, expected_state):
def test_stream_stops_on_401(mock_lists_resp):
# no requests should be made after getting 401 error despite the multiple slices
users_stream = ListUsers(authenticator=NoAuth())
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=1", json={}, status=401)
responses.add(responses.GET, "https://api.iterable.com/api/lists/getUsers?listId=2", json={}, status=401)
slices = 0
for slice_ in users_stream.stream_slices(sync_mode=SyncMode.full_refresh):
slices += 1
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
| **Instatus** | <img alt="Instatus icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/instatus.svg" height="30" height="30"/> | Source | airbyte/source-instatus:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/instatus) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-instatus) | <small>`1901024c-0249-45d0-bcac-31a954652927`</small> |
| **Intercom** | <img alt="Intercom icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/intercom.svg" height="30" height="30"/> | Source | airbyte/source-intercom:0.1.31 | generally_available | [link](https://docs.airbyte.com/integrations/sources/intercom) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-intercom) | <small>`d8313939-3782-41b0-be29-b3ca20d8dd3a`</small> |
| **Intruder** | <img alt="Intruder icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/intruder.svg" height="30" height="30"/> | Source | airbyte/source-intruder:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/intruder) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-intruder) | <small>`3d15163b-11d8-412f-b808-795c9b2c3a3a`</small> |
| **Iterable** | <img alt="Iterable icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/iterable.svg" height="30" height="30"/> | Source | airbyte/source-iterable:0.1.24 | generally_available | [link](https://docs.airbyte.com/integrations/sources/iterable) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-iterable) | <small>`2e875208-0c0b-4ee4-9e92-1cb3156ea799`</small> |
| **Iterable** | <img alt="Iterable icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/iterable.svg" height="30" height="30"/> | Source | airbyte/source-iterable:0.1.25 | generally_available | [link](https://docs.airbyte.com/integrations/sources/iterable) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-iterable) | <small>`2e875208-0c0b-4ee4-9e92-1cb3156ea799`</small> |
| **Jenkins** | <img alt="Jenkins icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/jenkins.svg" height="30" height="30"/> | Source | farosai/airbyte-jenkins-source:0.1.23 | alpha | [link](https://docs.airbyte.com/integrations/sources/jenkins) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/airbyte-jenkins-source) | <small>`d6f73702-d7a0-4e95-9758-b0fb1af0bfba`</small> |
| **Jira** | <img alt="Jira icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/jira.svg" height="30" height="30"/> | Source | airbyte/source-jira:0.3.4 | beta | [link](https://docs.airbyte.com/integrations/sources/jira) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-jira) | <small>`68e63de2-bb83-4c7e-93fa-a8a9051e3993`</small> |
| **K6 Cloud** | <img alt="K6 Cloud icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/k6cloud.svg" height="30" height="30"/> | Source | airbyte/source-k6-cloud:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/k6-cloud) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-k6-cloud) | <small>`e300ece7-b073-43a3-852e-8aff36a57f13`</small> |
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ The Iterable source connector supports the following [sync modes](https://docs.a

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
| 0.1.24 | 2023-02-14 | [22979](https://github.com/airbytehq/airbyte/pull/22979) | Specified date formatting in specification |
| 0.1.25 | 2023-03-07 | [23821](https://github.com/airbytehq/airbyte/pull/23821) | Added retry for `500 - Generic Error`, increased max attempts number to `6` to handle `ChunkedEncodingError` |
| 0.1.24 | 2023-02-14 | [22979](https://github.com/airbytehq/airbyte/pull/22979) | Specified date formatting in specification |
| 0.1.23 | 2023-01-27 | [22011](https://github.com/airbytehq/airbyte/pull/22011) | Set `AvailabilityStrategy` for streams explicitly to `None` |
| 0.1.22 | 2022-11-30 | [19913](https://github.com/airbytehq/airbyte/pull/19913) | Replace pendulum.parse -> dateutil.parser.parse to avoid memory leak |
| 0.1.21 | 2022-10-27 | [18537](https://github.com/airbytehq/airbyte/pull/18537) | Improve streams discovery |
Expand Down

0 comments on commit 16c99de

Please sign in to comment.