Skip to content

Commit

Permalink
Fix iterable memory consumption (#7591)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro authored Nov 3, 2021
1 parent a915034 commit 0c2fcb8
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "2e875208-0c0b-4ee4-9e92-1cb3156ea799",
"name": "Iterable",
"dockerRepository": "airbyte/source-iterable",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/iterable"
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/iterable
sourceType: api
- name: Jira
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-iterable
7 changes: 5 additions & 2 deletions airbyte-integrations/connectors/source-iterable/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"requests~=2.25",
]

TEST_REQUIREMENTS = ["pytest~=6.1"]
TEST_REQUIREMENTS = ["pytest~=6.1", "responses==0.13.3"]


setup(
Expand All @@ -20,6 +20,9 @@
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS + TEST_REQUIREMENTS,
install_requires=MAIN_REQUIREMENTS,
extras_require={
"tests": TEST_REQUIREMENTS,
},
package_data={"": ["*.json", "schemas/*.json"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self, start_date, **kwargs):
self.stream_params = {"dataTypeName": self.data_field}

def path(self, **kwargs) -> str:
return "/export/data.json"
return "export/data.json"

@staticmethod
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
Expand Down Expand Up @@ -114,6 +114,21 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
yield record

def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
https://api.iterable.com/api/docs#export_exportDataJson
Sending those type of requests could download large piece of json
objects splitted with newline character.
Passing stream=True argument to requests.session.send method to avoid
loading whole analytics report content into memory.
"""
return {"stream": True}


class Lists(IterableStream):
data_field = "lists"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
from unittest import mock

import pytest
import responses
from airbyte_cdk.models import SyncMode
from source_iterable.api import EmailSend


@pytest.fixture
def session_mock():
with mock.patch("airbyte_cdk.sources.streams.http.http.requests") as requests_mock:
session_mock = mock.MagicMock()
response_mock = mock.MagicMock()
requests_mock.Session.return_value = session_mock
session_mock.send.return_value = response_mock
response_mock.status_code = 200
yield session_mock


def test_send_email_stream(session_mock):
stream = EmailSend(start_date="2020", api_key="")
_ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=[], stream_state={}))

assert session_mock.send.called
send_args = session_mock.send.call_args[1]
assert send_args.get("stream") is True


@responses.activate
def test_stream_correct():
record_js = {"createdAt": "2020"}
NUMBER_OF_RECORDS = 10 ** 2
resp_body = "\n".join([json.dumps(record_js)] * NUMBER_OF_RECORDS)
responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body)
stream = EmailSend(start_date="2020", api_key="")
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=[], stream_state={}))
assert len(records) == NUMBER_OF_RECORDS
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Please read [How to find your API key](https://support.iterable.com/hc/en-us/art

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| `0.1.10` | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests |
| `0.1.9` | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream |
| `0.1.8` | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events |
| `0.1.7` | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata |
Expand Down

0 comments on commit 0c2fcb8

Please sign in to comment.