Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source mixpanel: Improved connector's input configuration validation #16570

Merged
merged 12 commits into from
Sep 13, 2022
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.version=0.1.21
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import base64
from datetime import datetime, timedelta
import logging
from typing import Any, List, Mapping, Tuple

import pendulum
Expand All @@ -14,7 +14,7 @@
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator

from .streams import Annotations, CohortMembers, Cohorts, Engage, Export, Funnels, FunnelsList, Revenue
from .testing import adapt_streams_if_testing
from .testing import adapt_streams_if_testing, adapt_validate_if_testing


class TokenAuthenticatorBase64(TokenAuthenticator):
Expand All @@ -24,6 +24,31 @@ def __init__(self, token: str, auth_method: str = "Basic", **kwargs):


class SourceMixpanel(AbstractSource):
@adapt_validate_if_testing
def _validate_and_transform(self, config: Mapping[str, Any]):
logger = logging.getLogger("airbyte")
source_spec = self.spec(logger)
default_project_timezone = source_spec.connectionSpecification["properties"]["project_timezone"]["default"]
config["project_timezone"] = pendulum.timezone(config.get("project_timezone", default_project_timezone))

today = pendulum.today(tz=config["project_timezone"]).date()
start_date = config.get("start_date")
if start_date:
config["start_date"] = pendulum.parse(start_date).date()
else:
config["start_date"] = today.subtract(days=365)

end_date = config.get("end_date")
if end_date:
config["end_date"] = pendulum.parse(end_date).date()
else:
config["end_date"] = today

for k in ["attribution_window", "select_properties_by_default", "region", "date_window_size"]:
if k not in config:
config[k] = source_spec.connectionSpecification["properties"][k]["default"]
return config

def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
See https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/source.py#L232
Expand All @@ -33,6 +58,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
:param logger: logger object
:return Tuple[bool, any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise.
"""
config = self._validate_and_transform(config)
try:
auth = TokenAuthenticatorBase64(token=config["api_secret"])
funnels = FunnelsList(authenticator=auth, **config)
Expand Down Expand Up @@ -61,20 +87,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
tzone = pendulum.timezone(config.get("project_timezone", "US/Pacific"))
now = datetime.now(tzone).date()

start_date = config.get("start_date")
if start_date and isinstance(start_date, str):
start_date = pendulum.parse(config["start_date"]).date()
config["start_date"] = start_date or now - timedelta(days=365)

end_date = config.get("end_date")
if end_date and isinstance(end_date, str):
end_date = pendulum.parse(end_date).date()
config["end_date"] = end_date or now # set to now by default

AirbyteLogger().log("INFO", f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")
config = self._validate_and_transform(config)
logger = logging.getLogger("airbyte")
logger.info(f"Using start_date: {config['start_date']}, end_date: {config['end_date']}")

auth = TokenAuthenticatorBase64(token=config["api_secret"])
return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
"order": 4,
"title": "Start Date",
"type": "string",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated. If this option is not set, the connector will replicate data from up to one year ago by default.",
"description": "The date in the format YYYY-MM-DD. Any data before this date will not be replicated. If this option is not set, the connector will replicate data from up to one year ago by default.",
"examples": ["2021-11-16"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$"
},
"end_date": {
"order": 5,
"title": "End Date",
"type": "string",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data after this date will not be replicated. Left empty to always sync to most recent date",
"description": "The date in the format YYYY-MM-DD. Any data after this date will not be replicated. Left empty to always sync to most recent date",
"examples": ["2021-11-16"],
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

import time
from abc import ABC
from datetime import date, datetime, timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from datetime import datetime, timedelta
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import requests
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from pendulum import Date


class MixpanelStream(HttpStream, ABC):
Expand All @@ -32,9 +33,9 @@ def url_base(self):
def __init__(
self,
authenticator: HttpAuthenticator,
region: str = None,
start_date: Union[date, str] = None,
end_date: Union[date, str] = None,
region: str,
start_date: Date = None,
end_date: Date = None,
date_window_size: int = 30, # in days
attribution_window: int = 0, # in days
select_properties_by_default: bool = True,
Expand All @@ -45,7 +46,7 @@ def __init__(
self.date_window_size = date_window_size
self.attribution_window = attribution_window
self.additional_properties = select_properties_by_default
self.region = region if region else "US"
self.region = region

super().__init__(authenticator=authenticator)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from datetime import datetime, timedelta
import logging
from functools import wraps

import pendulum
from airbyte_cdk.logger import AirbyteLogger

from .streams import Funnels

AVAILABLE_TESTING_RANGE_DAYS = 30
Expand All @@ -24,26 +21,18 @@ def funnel_slices_patched(self: Funnels, sync_mode):
def adapt_streams_if_testing(func):
"""
Due to API limitations (60 requests per hour) there is unavailable to make acceptance tests in normal mode,
so we're reducing amount of requests by, if `is_testing` flag is set in config:
so we're reducing amount of requests by, if `_testing` flag is set in config:

1. Take time range in only 1 month
2. Patch Funnels, so we download data only for one Funnel entity
3. Removing RPS limit for faster testing
1. Patch Funnels, so we download data only for one Funnel entity
2. Removing RPS limit for faster testing
"""

@wraps(func)
def wrapper(self, config):
is_testing = config.get("is_testing", False)
if not is_testing:
if not config.get("_testing"):
return func(self, config)

AirbyteLogger().log("INFO", "SOURCE IN TESTING MODE, DO NOT USE IN PRODUCTION!")
tzone = pendulum.timezone(config.get("project_timezone", "US/Pacific"))
now = datetime.now(tzone).date()
# 1. Take time range in only 1 month
config["start_date"] = now - timedelta(days=AVAILABLE_TESTING_RANGE_DAYS)

# 2. Patch Funnels, so we download data only for one Funnel entity
# Patch Funnels, so we download data only for one Funnel entity
Funnels.funnel_slices = funnel_slices_patched

streams = func(self, config)
Expand All @@ -53,3 +42,25 @@ def wrapper(self, config):
return streams

return wrapper


def adapt_validate_if_testing(func):
"""
Due to API limitations (60 requests per hour) there is unavailable to make acceptance tests in normal mode,
so we're reducing amount of requests by, if `_testing` flag is set in config:

1. Take time range in only 1 month
"""

@wraps(func)
def wrapper(self, config):
config = func(self, config)
if config.get("_testing"):
logger = logging.getLogger("airbyte")
logger.info("SOURCE IN TESTING MODE, DO NOT USE IN PRODUCTION!")
# Take time range in only 1 month
if (config["end_date"] - config["start_date"]).days > AVAILABLE_TESTING_RANGE_DAYS:
config["start_date"] = config["end_date"].subtract(days=AVAILABLE_TESTING_RANGE_DAYS)
return config

return wrapper
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,33 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from datetime import timedelta

import pendulum
import pytest


@pytest.fixture
def start_date():
return pendulum.parse("2017-01-25T00:00:00Z").date()
return pendulum.parse("2017-01-25").date()


@pytest.fixture
def config(start_date):
return {
"api_secret": "unexisting-secret",
"attribution_window": 5,
"project_timezone": "UTC",
"project_timezone": pendulum.timezone("UTC"),
"select_properties_by_default": True,
"start_date": start_date,
"end_date": start_date + timedelta(days=31),
"end_date": start_date.add(days=31),
"region": "US",
}


@pytest.fixture
def config_raw(config):
return {
**config,
"project_timezone": config["project_timezone"].name,
"start_date": str(config["start_date"]),
"end_date": str(config["end_date"]),
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def export_response():
)


def test_export_stream_conflict_names(requests_mock, export_response):
stream = Export(authenticator=MagicMock())
def test_export_stream_conflict_names(requests_mock, export_response, config):
stream = Export(authenticator=MagicMock(), **config)
# Remove requests limit for test
stream.reqs_per_hour_limit = 0
requests_mock.register_uri("GET", get_url_to_mock(stream), export_response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ def check_connection_url(config):
(500, False, {"error": "Server error"}),
],
)
def test_check_connection(requests_mock, check_connection_url, config, response_code, expect_success, response_json):
def test_check_connection(requests_mock, check_connection_url, config_raw, response_code, expect_success, response_json):
requests_mock.register_uri("GET", check_connection_url, setup_response(response_code, response_json))
ok, error = SourceMixpanel().check_connection(logger, config)
ok, error = SourceMixpanel().check_connection(logger, config_raw)
assert ok == expect_success and error != expect_success
expected_error = response_json.get("error")
if expected_error:
Expand All @@ -44,19 +44,19 @@ def test_check_connection_bad_config():
assert not ok and error


def test_check_connection_incomplete(config):
config.pop("api_secret")
ok, error = SourceMixpanel().check_connection(logger, config)
def test_check_connection_incomplete(config_raw):
config_raw.pop("api_secret")
ok, error = SourceMixpanel().check_connection(logger, config_raw)
assert not ok and error


def test_streams(config):
streams = SourceMixpanel().streams(config)
def test_streams(config_raw):
streams = SourceMixpanel().streams(config_raw)
assert len(streams) == 7


def test_streams_string_date(config):
config = copy.deepcopy(config)
def test_streams_string_date(config_raw):
config = copy.deepcopy(config_raw)
config["start_date"] = "2020-01-01"
config["end_date"] = "2020-01-02"
streams = SourceMixpanel().streams(config)
Expand Down
Loading