Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix iterable memory consumption #7591

Merged
merged 1 commit into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "2e875208-0c0b-4ee4-9e92-1cb3156ea799",
"name": "Iterable",
"dockerRepository": "airbyte/source-iterable",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/iterable"
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/sources/iterable
sourceType: api
- name: Jira
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-iterable
7 changes: 5 additions & 2 deletions airbyte-integrations/connectors/source-iterable/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"requests~=2.25",
]

TEST_REQUIREMENTS = ["pytest~=6.1"]
TEST_REQUIREMENTS = ["pytest~=6.1", "responses==0.13.3"]


setup(
Expand All @@ -20,6 +20,9 @@
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS + TEST_REQUIREMENTS,
install_requires=MAIN_REQUIREMENTS,
extras_require={
"tests": TEST_REQUIREMENTS,
},
package_data={"": ["*.json", "schemas/*.json"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(self, start_date, **kwargs):
self.stream_params = {"dataTypeName": self.data_field}

def path(self, **kwargs) -> str:
return "/export/data.json"
return "export/data.json"

@staticmethod
def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime:
Expand Down Expand Up @@ -114,6 +114,21 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
yield record

def request_kwargs(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
"""
https://api.iterable.com/api/docs#export_exportDataJson
Sending those type of requests could download large piece of json
objects splitted with newline character.
Passing stream=True argument to requests.session.send method to avoid
loading whole analytics report content into memory.
"""
return {"stream": True}


class Lists(IterableStream):
data_field = "lists"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import json
from unittest import mock

import pytest
import responses
from airbyte_cdk.models import SyncMode
from source_iterable.api import EmailSend


@pytest.fixture
def session_mock():
with mock.patch("airbyte_cdk.sources.streams.http.http.requests") as requests_mock:
session_mock = mock.MagicMock()
response_mock = mock.MagicMock()
requests_mock.Session.return_value = session_mock
session_mock.send.return_value = response_mock
response_mock.status_code = 200
yield session_mock


def test_send_email_stream(session_mock):
stream = EmailSend(start_date="2020", api_key="")
_ = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=[], stream_state={}))

assert session_mock.send.called
send_args = session_mock.send.call_args[1]
assert send_args.get("stream") is True


@responses.activate
def test_stream_correct():
record_js = {"createdAt": "2020"}
NUMBER_OF_RECORDS = 10 ** 2
resp_body = "\n".join([json.dumps(record_js)] * NUMBER_OF_RECORDS)
responses.add("GET", "https://api.iterable.com/api/export/data.json", body=resp_body)
stream = EmailSend(start_date="2020", api_key="")
records = list(stream.read_records(sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=[], stream_state={}))
assert len(records) == NUMBER_OF_RECORDS
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Please read [How to find your API key](https://support.iterable.com/hc/en-us/art

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| `0.1.10` | 2021-11-03 | [7591](https://github.com/airbytehq/airbyte/pull/7591) | Optimize export streams memory consumption for large requests |
| `0.1.9` | 2021-10-06 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Enable campaign_metrics stream |
| `0.1.8` | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events |
| `0.1.7` | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata |
Expand Down