Skip to content

Commit

Permalink
🎉 Source mixpanel - disable streams "export", "engage" on discover if…
Browse files Browse the repository at this point in the history
… no access (#17145)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored Sep 27, 2022
1 parent 4c5f6aa commit 01c8eb1
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.24
dockerImageTag: 0.1.25
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6378,7 +6378,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mixpanel:0.1.24"
- dockerImage: "airbyte/source-mixpanel:0.1.25"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
auth = self.get_authenticator(config)
FunnelsList.max_retries = 0
funnels = FunnelsList(authenticator=auth, **config)
next(read_full_refresh(funnels))
funnels.reqs_per_hour_limit = 0
next(read_full_refresh(funnels), None)
except requests.HTTPError as e:
return False, e.response.json()["error"]
except Exception as e:
Expand All @@ -99,12 +100,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")

auth = self.get_authenticator(config)
return [
streams = [
Annotations(authenticator=auth, **config),
Cohorts(authenticator=auth, **config),
CohortMembers(authenticator=auth, **config),
Engage(authenticator=auth, **config),
Export(authenticator=auth, **config),
Funnels(authenticator=auth, **config),
Revenue(authenticator=auth, **config),
]

# streams with dynamically generated schema
for stream in [Engage(authenticator=auth, **config), Export(authenticator=auth, **config)]:
try:
stream.get_json_schema()
except requests.HTTPError as e:
if e.response.status_code != 402:
raise e
logger.warning("Stream '%s' - is disabled, reason: 402 Payment Required", stream.name)
else:
streams.append(stream)

return streams
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from functools import cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import requests
Expand Down Expand Up @@ -165,6 +166,7 @@ def process_response(self, response: requests.Response, stream_state: Mapping[st
if not item_cursor or not state_cursor or item_cursor >= state_cursor:
yield item

@cache
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
from functools import cache
from typing import Any, Iterable, Mapping, MutableMapping

import pendulum
Expand Down Expand Up @@ -123,6 +124,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma

yield item

@cache
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,25 @@ def test_check_connection_incomplete(config_raw):
assert command_check(source, config_raw) == AirbyteConnectionStatus(status=Status.FAILED, message="KeyError('api_secret')")


def test_streams(config_raw):
def test_streams(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
streams = SourceMixpanel().streams(config_raw)
assert len(streams) == 7


def test_streams_string_date(config_raw):
def test_streams_string_date(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
config = copy.deepcopy(config_raw)
config["start_date"] = "2020-01-01"
config["end_date"] = "2020-01-02"
streams = SourceMixpanel().streams(config)
assert len(streams) == 7


def test_streams_disabled_402(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(402, {}))
streams = SourceMixpanel().streams(config_raw)
assert {s.name for s in streams} == {"cohort_members", "funnels", "revenue", "annotations", "cohorts"}
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------|
| 0.1.25 | 2022-09-27 | [17145](https://github.com/airbytehq/airbyte/pull/17145) | Disable streams "export", "engage" on discover if not access |
| 0.1.24 | 2022-09-26 | [16915](https://github.com/airbytehq/airbyte/pull/16915) | Added Service Accounts support |
| 0.1.23 | 2022-09-18 | [16843](https://github.com/airbytehq/airbyte/pull/16843) | Add stream=True for `export` stream |
| 0.1.22 | 2022-09-15 | [16770](https://github.com/airbytehq/airbyte/pull/16770) | Use "Retry-After" header for backoff |
Expand Down

0 comments on commit 01c8eb1

Please sign in to comment.