Skip to content

Commit

Permalink
Source Mixpanel: "export" stream make line parsing more robust (#18846)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored Nov 3, 2022
1 parent 8bb9701 commit c01b81b
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.28
dockerImageTag: 0.1.29
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7322,7 +7322,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mixpanel:0.1.28"
- dockerImage: "airbyte/source-mixpanel:0.1.29"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mixpanel"
connectionSpecification:
Expand All @@ -7331,7 +7331,7 @@
type: "object"
properties:
credentials:
title: "Authentication"
title: "Authentication *"
description: "Choose how to authenticate to Mixpanel"
type: "object"
order: 0
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.version=0.1.29
LABEL io.airbyte.name=airbyte/source-mixpanel
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk~=0.2",
]

TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "pytest-mock~=3.6", "requests_mock~=1.8"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@ def url_base(self):
def path(self, **kwargs) -> str:
return "export"

def iter_dicts(self, lines):
"""
The incoming stream has to be JSON lines format.
From time to time for some reason, the one record can be split into multiple lines.
We try to combine such split parts into one record only if parts go nearby.
"""
parts = []
for record_line in lines:
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
return
try:
yield json.loads(record_line)
except ValueError:
parts.append(record_line)
else:
parts = []

if len(parts) > 1:
try:
yield json.loads("".join(parts))
except ValueError:
pass
else:
parts = []

def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
"""Export API return response in JSONL format but each line is a valid JSON object
Raw item example:
Expand All @@ -106,11 +132,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
"""

# We prefer response.iter_lines() to response.text.split_lines() as the later can missparse text properties embeding linebreaks
for record_line in response.iter_lines(decode_unicode=True):
if record_line == "terminated early":
self.logger.warning(f"Couldn't fetch data from Export API. Response: {record_line}")
break
record = json.loads(record_line)
for record in self.iter_dicts(response.iter_lines(decode_unicode=True)):
# transform record into flat dict structure
item = {"event": record["event"]}
properties = record["properties"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
from datetime import timedelta
from unittest.mock import MagicMock

Expand Down Expand Up @@ -456,3 +457,14 @@ def test_export_terminated_early(requests_mock, config):
stream = Export(authenticator=MagicMock(), **config)
requests_mock.register_uri("GET", get_url_to_mock(stream), text="terminated early\n")
assert list(read_full_refresh(stream)) == []


def test_export_iter_dicts(config):
stream = Export(authenticator=MagicMock(), **config)
record = {"key1": "value1", "key2": "value2"}
record_string = json.dumps(record)
assert list(stream.iter_dicts([record_string, record_string])) == [record, record]
# combine record from 2 standing nearby parts
assert list(stream.iter_dicts([record_string, record_string[:2], record_string[2:], record_string])) == [record, record, record]
# drop record parts because they are not standing nearby
assert list(stream.iter_dicts([record_string, record_string[:2], record_string, record_string[2:]])) == [record, record]
3 changes: 2 additions & 1 deletion docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ Syncing huge date windows may take longer due to Mixpanel's low API rate-limits

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :--------------------------------------------------------------------------------------------------- |
| 0.1.28 | 2022-10-06 | [17699](https://github.com/airbytehq/airbyte/pull/17699) | Fix discover step issue cursor field None |
| 0.1.29 | 2022-11-02 | [18846](https://github.com/airbytehq/airbyte/pull/18846) | For "export" stream make line parsing more robust |
| 0.1.28 | 2022-10-06 | [17699](https://github.com/airbytehq/airbyte/pull/17699) | Fix discover step issue cursor field None |
| 0.1.27 | 2022-09-29 | [17415](https://github.com/airbytehq/airbyte/pull/17415) | Disable stream "cohort_members" on discover if not access |
| 0.1.26 | 2022-09-28 | [17304](https://github.com/airbytehq/airbyte/pull/17304) | Migrate to per-stream states. |
| 0.1.25 | 2022-09-27 | [17145](https://github.com/airbytehq/airbyte/pull/17145) | Disable streams "export", "engage" on discover if not access |
Expand Down

0 comments on commit c01b81b

Please sign in to comment.