Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source mixpanel - disable streams "export", "engage" on discover if no access #17145

Merged
merged 6 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.24
dockerImageTag: 0.1.25
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6223,7 +6223,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mixpanel:0.1.24"
- dockerImage: "airbyte/source-mixpanel:0.1.25"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
auth = self.get_authenticator(config)
FunnelsList.max_retries = 0
funnels = FunnelsList(authenticator=auth, **config)
next(read_full_refresh(funnels))
funnels.reqs_per_hour_limit = 0
next(read_full_refresh(funnels), None)
except requests.HTTPError as e:
return False, e.response.json()["error"]
except Exception as e:
Expand All @@ -99,12 +100,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")

auth = self.get_authenticator(config)
return [
streams = [
Annotations(authenticator=auth, **config),
Cohorts(authenticator=auth, **config),
CohortMembers(authenticator=auth, **config),
Engage(authenticator=auth, **config),
Export(authenticator=auth, **config),
Funnels(authenticator=auth, **config),
Revenue(authenticator=auth, **config),
]

# streams with dynamically generated schema
for stream in [Engage(authenticator=auth, **config), Export(authenticator=auth, **config)]:
try:
stream.get_json_schema()
except requests.HTTPError as e:
if e.response.status_code != 402:
raise e
logger.warning("Stream '%s' - is disabled, reason: 402 Payment Required", stream.name)
else:
streams.append(stream)

return streams
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from functools import cache
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import requests
Expand Down Expand Up @@ -165,6 +166,7 @@ def process_response(self, response: requests.Response, stream_state: Mapping[st
if not item_cursor or not state_cursor or item_cursor >= state_cursor:
yield item

@cache
bazarnov marked this conversation as resolved.
Show resolved Hide resolved
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import json
from functools import cache
from typing import Any, Iterable, Mapping, MutableMapping

import pendulum
Expand Down Expand Up @@ -123,6 +124,7 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma

yield item

@cache
def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,25 @@ def test_check_connection_incomplete(config_raw):
assert command_check(source, config_raw) == AirbyteConnectionStatus(status=Status.FAILED, message="KeyError('api_secret')")


def test_streams(config_raw):
def test_streams(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
streams = SourceMixpanel().streams(config_raw)
assert len(streams) == 7


def test_streams_string_date(config_raw):
def test_streams_string_date(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(200, {}))
config = copy.deepcopy(config_raw)
config["start_date"] = "2020-01-01"
config["end_date"] = "2020-01-02"
streams = SourceMixpanel().streams(config)
assert len(streams) == 7


def test_streams_disabled_402(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/engage/properties", setup_response(402, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/2.0/events/properties/top", setup_response(402, {}))
streams = SourceMixpanel().streams(config_raw)
assert {s.name for s in streams} == {"cohort_members", "funnels", "revenue", "annotations", "cohorts"}
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------|
| 0.1.25 | 2022-09-27 | [17145](https://github.com/airbytehq/airbyte/pull/17145) | Disable streams "export", "engage" on discover if not access |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Added permission check for the "export" and "engage" streams, on the schema discover stage.

| 0.1.24 | 2022-09-26 | [16915](https://github.com/airbytehq/airbyte/pull/16915) | Added Service Accounts support |
| 0.1.23 | 2022-09-18 | [16843](https://github.com/airbytehq/airbyte/pull/16843) | Add stream=True for `export` stream |
| 0.1.22 | 2022-09-15 | [16770](https://github.com/airbytehq/airbyte/pull/16770) | Use "Retry-After" header for backoff |
Expand Down