From f3167214b0ea55113e8964ef92c6978ccb41c070 Mon Sep 17 00:00:00 2001 From: Angelika Tarnawa <59344718+angelika233@users.noreply.github.com> Date: Thu, 26 Sep 2024 11:58:25 +0200 Subject: [PATCH] Migrate supermetrics (#1054) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * supermetrics refactered and tested * 🎨 Change secret name * 🎨 Docstring formatting * Fix formatting * Add pytest mocker * Format code * format with ruff * Format with ruff * Organize imports * Organize imports --------- Co-authored-by: fdelgadodyvenia Co-authored-by: angelika233 --- pyproject.toml | 1 + requirements-dev.lock | 6 +- requirements.lock | 13 +- src/viadot/__init__.py | 4 +- .../orchestration/prefect/flows/__init__.py | 4 +- .../prefect/flows/supermetrics_to_adls.py | 92 ++++++ .../orchestration/prefect/tasks/__init__.py | 14 +- .../prefect/tasks/supermetrics.py | 62 ++++ src/viadot/sources/__init__.py | 15 +- src/viadot/sources/supermetrics.py | 284 ++++++++++++++++++ .../prefect/flows/test_supermetrics.py | 37 +++ tests/unit/test_supermetrics.py | 185 ++++++++++++ 12 files changed, 695 insertions(+), 22 deletions(-) create mode 100644 src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py create mode 100644 src/viadot/orchestration/prefect/tasks/supermetrics.py create mode 100644 src/viadot/sources/supermetrics.py create mode 100644 tests/integration/orchestration/prefect/flows/test_supermetrics.py create mode 100644 tests/unit/test_supermetrics.py diff --git a/pyproject.toml b/pyproject.toml index 155ee859e..3472ea9c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ "numpy>=1.23.4, <2.0", "defusedxml>=0.7.1", "aiohttp>=3.10.5", + "pytest-mock>=3.14.0", ] requires-python = ">=3.10" readme = "README.md" diff --git a/requirements-dev.lock b/requirements-dev.lock index c346b284f..e74fc29b2 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -7,7 +7,6 @@ # all-features: false # with-sources: false # generate-hashes: false -# universal: false -e file:. aiohappyeyeballs==2.4.0 @@ -433,7 +432,10 @@ pyparsing==3.1.2 # via mike pytest==8.3.2 # via pytest-asyncio + # via pytest-mock pytest-asyncio==0.23.8 +pytest-mock==3.14.0 + # via viadot2 python-dateutil==2.9.0.post0 # via botocore # via croniter @@ -529,7 +531,7 @@ ruamel-yaml==0.18.6 # via prefect ruamel-yaml-clib==0.2.8 # via ruamel-yaml -ruff==0.6.1 +ruff==0.6.7 s3transfer==0.10.2 # via boto3 scipy==1.14.0 diff --git a/requirements.lock b/requirements.lock index 32c422a17..1f2dc9fca 100644 --- a/requirements.lock +++ b/requirements.lock @@ -7,7 +7,6 @@ # all-features: false # with-sources: false # generate-hashes: false -# universal: false -e file:. aiohappyeyeballs==2.4.0 @@ -87,6 +86,7 @@ et-xmlfile==1.1.0 exceptiongroup==1.2.2 # via anyio # via prefect + # via pytest frozenlist==1.4.1 # via aiohttp # via aiosignal @@ -129,6 +129,8 @@ imagehash==4.3.1 # via viadot2 importlib-resources==6.1.3 # via prefect +iniconfig==2.0.0 + # via pytest itsdangerous==2.2.0 # via prefect jinja2==3.1.4 @@ -186,6 +188,7 @@ orjson==3.10.7 # via prefect packaging==24.1 # via prefect + # via pytest pandas==2.2.2 # via viadot2 # via visions @@ -195,6 +198,8 @@ pendulum==2.1.2 # via prefect pillow==10.4.0 # via imagehash +pluggy==1.5.0 + # via pytest prefect==2.20.2 # via prefect-github # via prefect-sqlalchemy @@ -226,6 +231,10 @@ pygments==2.18.0 # via rich pyodbc==5.1.0 # via viadot2 +pytest==8.3.3 + # via pytest-mock +pytest-mock==3.14.0 + # via viadot2 python-dateutil==2.9.0.post0 # via croniter # via dateparser @@ -330,6 +339,8 @@ text-unidecode==1.3 # via python-slugify toml==0.10.2 # via prefect +tomli==2.0.1 + # via pytest trino==0.328.0 # via viadot2 typer==0.12.4 diff --git a/src/viadot/__init__.py b/src/viadot/__init__.py index 6c5389e07..6f31c5d9b 100644 --- a/src/viadot/__init__.py +++ b/src/viadot/__init__.py @@ -7,12 +7,12 @@ # but keep WARNING and higher ones in case something actually important happens. azure_clutter_logger_1 = logging.getLogger( - "azure.core.pipeline.policies.http_logging_policy" + "azure.core.pipeline.policies.http_logging_policy", ) azure_clutter_logger_1.setLevel(logging.WARNING) azure_clutter_logger_2 = logging.getLogger( - "azure.identity.aio._internal.get_token_mixin" + "azure.identity.aio._internal.get_token_mixin", ) azure_clutter_logger_2.setLevel(logging.WARNING) diff --git a/src/viadot/orchestration/prefect/flows/__init__.py b/src/viadot/orchestration/prefect/flows/__init__.py index 48a4524ea..73cff365d 100644 --- a/src/viadot/orchestration/prefect/flows/__init__.py +++ b/src/viadot/orchestration/prefect/flows/__init__.py @@ -21,6 +21,7 @@ from .sharepoint_to_s3 import sharepoint_to_s3 from .sql_server_to_minio import sql_server_to_minio from .sql_server_to_parquet import sql_server_to_parquet +from .supermetrics_to_adls import supermetrics_to_adls from .transform import transform from .transform_and_catalog import transform_and_catalog @@ -32,9 +33,9 @@ "duckdb_to_sql_server", "duckdb_transform", "epicor_to_parquet", + "exchange_rates_api_to_redshift_spectrum", "exchange_rates_to_adls", "exchange_rates_to_databricks", - "exchange_rates_api_to_redshift_spectrum", "genesys_to_adls", "hubspot_to_adls", "mindful_to_adls", @@ -47,6 +48,7 @@ "sharepoint_to_s3", "sql_server_to_minio", "sql_server_to_parquet", + "supermetrics_to_adls", "transform", "transform_and_catalog", ] diff --git a/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py new file mode 100644 index 000000000..63d62a621 --- /dev/null +++ b/src/viadot/orchestration/prefect/flows/supermetrics_to_adls.py @@ -0,0 +1,92 @@ +"""Flow for downloading the data from Superpetrics and uploading it to ADLS.""" + +from typing import Any + +from prefect import flow +from prefect.task_runners import ConcurrentTaskRunner + +from viadot.orchestration.prefect.tasks import ( + df_to_adls, + supermetrics_to_df, +) + + +@flow( + name="Supermetrics extraction to ADLS", + description="Extract data from Supermetrics and load it into ADLS.", + retries=1, + retry_delay_seconds=60, + task_runner=ConcurrentTaskRunner, +) +def supermetrics_to_adls( + # supermetrics + query_params: dict[str, Any] | None = None, + # ADLS + adls_path: str | None = None, + overwrite: bool = False, + # Auth + supermetrics_credentials_secret: str | None = None, + supermetrics_config_key: str | None = None, + adls_credentials_secret: str | None = None, + adls_config_key: str | None = None, + **kwargs: dict[str, Any] | None, +) -> None: + """Flow to extract data from the Supermetrics API and save it to ADLS. + + This function queries data from the Supermetrics API using the provided query + parameters and saves the resulting DataFrame to Azure Data Lake Storage (ADLS) + as a file. + + Args: + ---- + query_params (dict[str, Any], optional): + A dictionary of query parameters for the Supermetrics API. These parameters + specify the data to retrieve from Supermetrics. If not provided, the default + parameters from the Supermetrics configuration will be used. + adls_path (str, optional): + The destination path in ADLS where the DataFrame will be saved. This should + include the file name and extension (e.g., 'myfolder/myfile.csv'). If not + provided, the function will use a default path from the configuration + or raise an error. + overwrite (bool, optional): + A flag indicating whether to overwrite the existing file in ADLS. If set + to Falseand the file exists, an error will be raised. Default is False. + supermetrics_credentials_secret (str, optional): + The name of the secret in the secret management system containing + the Supermetrics API credentials. If not provided, the function will use + credentials specified in the configuration. + supermetrics_config_key (str, optional): + The key in the viadot configuration holding relevant credentials. + Defaults to None. + adls_credentials_secret (str, optional): + The name of the secret in the secret management system containing + the ADLS credentials. If not provided, the function will use credentials + specified in the configuration. + adls_config_key (str, optional): + The key in the viadot configuration holding relevant credentials. + Defaults to None. + **kwargs (dict[str, Any], optional): + Additional keyword arguments to pass to the `supermetrics_to_df` function + for further customization of the Supermetrics query. + + Raises: + ------ + ValueError: + If `adls_path` is not provided and cannot be determined from + the configuration. + + """ + df = supermetrics_to_df( + query_params=query_params, + credentials_secret=supermetrics_credentials_secret, + config_key=supermetrics_config_key, + **kwargs, + ) + + return df_to_adls( + df=df, + path=adls_path, + credentials_secret=adls_credentials_secret, + config_key=adls_config_key, + overwrite=overwrite, + ) diff --git a/src/viadot/orchestration/prefect/tasks/__init__.py b/src/viadot/orchestration/prefect/tasks/__init__.py index 8e1053f08..965c4e11f 100644 --- a/src/viadot/orchestration/prefect/tasks/__init__.py +++ b/src/viadot/orchestration/prefect/tasks/__init__.py @@ -23,31 +23,33 @@ sharepoint_to_df, ) from .sql_server import create_sql_server_table, sql_server_query, sql_server_to_df +from .supermetrics import supermetrics_to_df __all__ = [ "adls_upload", - "df_to_adls", "bcp", + "clone_repo", "cloud_for_customers_to_df", - "df_to_databricks", + "create_sql_server_table", "dbt_task", + "df_to_adls", + "df_to_databricks", + "df_to_minio", + "df_to_redshift_spectrum", "duckdb_query", "epicor_to_df", "exchange_rates_to_df", "genesys_to_df", - "clone_repo", "hubspot_to_df", "luma_ingest_task", "mindful_to_df", - "df_to_minio", "outlook_to_df", - "df_to_redshift_spectrum", "s3_upload_file", "sap_rfc_to_df", "sharepoint_download_file", "sharepoint_to_df", - "create_sql_server_table", "sql_server_query", "sql_server_to_df", + "supermetrics_to_df", ] diff --git a/src/viadot/orchestration/prefect/tasks/supermetrics.py b/src/viadot/orchestration/prefect/tasks/supermetrics.py new file mode 100644 index 000000000..dd7874bda --- /dev/null +++ b/src/viadot/orchestration/prefect/tasks/supermetrics.py @@ -0,0 +1,62 @@ +"""Task for connecting to Supermetrics API.""" + +import pandas as pd +from prefect import task + +from viadot.orchestration.prefect.exceptions import MissingSourceCredentialsError +from viadot.orchestration.prefect.utils import get_credentials +from viadot.sources import Supermetrics + + +@task(retries=3, log_prints=True, retry_delay_seconds=10, timeout_seconds=60 * 60) +def supermetrics_to_df( + query_params: dict, + config_key: str | None = None, + credentials_secret: str | None = None, +) -> pd.DataFrame: + """Task to retrive data from Supermetrics and returns it as a pandas DataFrame. + + This function queries the Supermetrics API using the provided query parameters and + returns the data as a pandas DataFrame. The function supports both + configuration-based and secret-based credentials. + + The function is decorated with a Prefect task, allowing it to handle retries, + logging, and timeout behavior. + + Args: + ---- + query_params (dict): + A dictionary containing the parameters for querying the Supermetrics API. + These parameters define what data to retrieve and how the query should + be constructed. + config_key (str, optional): The key in the viadot config holding relevant + credentials. Defaults to None. + credentials_secret (str, optional): + The name of the secret in your secret management system that contains + the Supermetrics API credentials. If `config_key` is not provided, + this secret is used to authenticate with the Supermetrics API. + + Returns: + ------- + pd.DataFrame: + A pandas DataFrame containing the data retrieved from Supermetrics based + on the provided query parameters. + + Raises: + ------ + MissingSourceCredentialsError: + Raised if neither `credentials_secret` nor `config_key` is provided, + indicating that no valid credentials were supplied to access + the Supermetrics API. + + """ + if not (credentials_secret or config_key): + raise MissingSourceCredentialsError + + credentials = get_credentials(credentials_secret) if not config_key else None + + supermetrics = Supermetrics( + credentials=credentials, + config_key=config_key, + ) + return supermetrics.to_df(query_params=query_params) diff --git a/src/viadot/sources/__init__.py b/src/viadot/sources/__init__.py index 368eeb13c..d05d91fbf 100644 --- a/src/viadot/sources/__init__.py +++ b/src/viadot/sources/__init__.py @@ -13,6 +13,7 @@ from .outlook import Outlook from .sharepoint import Sharepoint from .sql_server import SQLServer +from .supermetrics import Supermetrics, SupermetricsCredentials from .uk_carbon_intensity import UKCarbonIntensity @@ -21,46 +22,40 @@ "Epicor", "ExchangeRates", "Genesys", - "Outlook", "Hubspot", "Mindful", + "Outlook", + "SQLServer", "Sharepoint", + "Supermetrics", + "SupermetricsCredentials", # pragma: allowlist-secret "Trino", - "SQLServer", "UKCarbonIntensity", ] - if find_spec("adlfs"): from viadot.sources.azure_data_lake import AzureDataLake # noqa: F401 __all__.extend(["AzureDataLake"]) - if find_spec("duckdb"): from viadot.sources._duckdb import DuckDB # noqa: F401 __all__.extend(["DuckDB"]) - if find_spec("redshift_connector"): from viadot.sources.redshift_spectrum import RedshiftSpectrum # noqa: F401 __all__.extend(["RedshiftSpectrum"]) - if find_spec("s3fs"): from viadot.sources.s3 import S3 # noqa: F401 __all__.extend(["S3"]) - if find_spec("s3fs"): from viadot.sources.minio import MinIO # noqa: F401 __all__.extend(["MinIO"]) - - if find_spec("pyrfc"): from viadot.sources.sap_rfc import SAPRFC, SAPRFCV2 # noqa: F401 __all__.extend(["SAPRFC", "SAPRFCV2"]) - if find_spec("pyspark"): from viadot.sources.databricks import Databricks # noqa: F401 diff --git a/src/viadot/sources/supermetrics.py b/src/viadot/sources/supermetrics.py new file mode 100644 index 000000000..4010c9ed9 --- /dev/null +++ b/src/viadot/sources/supermetrics.py @@ -0,0 +1,284 @@ +"""Source for connecting to Supermetrics API.""" + +import json +from typing import Any + +import numpy as np +import pandas as pd +from pydantic import BaseModel + +from viadot.config import get_source_credentials + +from ..exceptions import CredentialError +from ..utils import add_viadot_metadata_columns, handle_api_response +from .base import Source + + +class SupermetricsCredentials(BaseModel): + """Represents credentials for accessing the Supermetrics API. + + This class encapsulates the necessary credentials required to authenticate + and access the Supermetrics API. + + Attributes: + ---------- + user (str): + The email account associated with the Supermetrics user. + api_key (str): + The API key that provides access to the Supermetrics API. + + """ + + user: str + api_key: str + + +class Supermetrics(Source): + """Implements methods for querying and interacting with the Supermetrics API. + + This class provides functionality to query data from the Supermetrics API, + which is a tool used for accessing data from various data sources. The API + documentation can be found at: + https://supermetrics.com/docs/product-api-getting-started/. For information + on usage limits, please refer to: + https://supermetrics.com/docs/product-api-usage-limits/. + + Args: + ---- + config_key (str, optional): + The key in the viadot configuration that holds the relevant credentials + for the API. Defaults to None. + credentials (dict of str to any, optional): + A dictionary containing the credentials needed for the API connection + configuration, specifically `api_key` and `user`. Defaults to None. + query_params (dict of str to any, optional): + A dictionary containing the parameters to pass to the GET query. + These parameters define the specifics of the data request. Defaults to None. + For a full specification of possible parameters, see: + https://supermetrics.com/docs/product-api-get-data/. + + """ + + API_ENDPOINT = "https://api.supermetrics.com/enterprise/v2/query/data/json" + + def __init__( + self, + *args, + credentials: dict[str, Any] | None = None, + config_key: str | None = None, + query_params: dict[str, Any] | None = None, + **kwargs, + ) -> None: + """Initialize the Supermetrics object. + + This constructor sets up the necessary components to interact with the + Supermetrics API, including the credentials and any query parameters. + + Args: + ---- + credentials (SupermetricsCredentials, optional): + An instance of `SupermetricsCredentials` containing the API key and + user email for authentication. Defaults to None. + config_key (str, optional): + The key in the viadot configuration that holds the relevant credentials + for the API. Defaults to None. + query_params (dict of str to any, optional): + A dictionary containing the parameters to pass to the GET query. These + parameters define the specifics of the data request. Defaults to None. + + """ + credentials = credentials or get_source_credentials(config_key) or None + + if credentials is None or not isinstance(credentials, dict): + msg = "Missing credentials." + raise CredentialError(msg) + self.credentials = dict(SupermetricsCredentials(**credentials)) + + super().__init__(*args, credentials=self.credentials, **kwargs) + + self.query_params = query_params + self.api_key = self.credentials["api_key"] + self.user = self.credentials["user"] + + def to_json(self, timeout: tuple = (3.05, 60 * 30)) -> dict[str, Any]: + """Download query results to a dictionary. + + This method executes the query against the Supermetrics API and retrieves + the results as a JSON dictionary. + + Args: + ---- + timeout (tuple of float, optional): + A tuple specifying the timeout values for the request. The first value + is the timeout for connection issues, and the second value is + the timeout for query execution. Defaults to (3.05, 1800), which + provides a short timeout for connection issues and a longer timeout + for the query execution. + + Returns: + ------- + dict: + The response from the Supermetrics API, returned as a JSON dictionary. + + Raises: + ------ + ValueError: + Raised if the query parameters are not set before calling this method. + + """ + if not self.query_params: + msg = "Please build the query first" + raise ValueError(msg) + + params = {"json": json.dumps(self.query_params)} + headers = {"Authorization": f"Bearer {self.api_key}"} + + response = handle_api_response( + url=self.API_ENDPOINT, + params=params, + headers=headers, + timeout=timeout, + ) + return response.json() + + @classmethod + def _get_col_names_google_analytics( + cls, + response: dict, + ) -> list[str]: + """Get column names from Google Analytics data. + + This method extracts the column names from the JSON response received + from a Google Analytics API call. + + Args: + ---- + response (dict): + A dictionary containing the JSON response from the API call. + + Returns: + ------- + list of str: + A list of column names extracted from the Google Analytics data. + + Raises: + ------ + ValueError: + Raised if no data is returned in the response or if the column names + cannot be determined. + + """ + is_pivoted = any( + field["field_split"] == "column" + for field in response["meta"]["query"]["fields"] + ) + + if is_pivoted: + if not response["data"]: + msg = "Couldn't find column names as query returned no data." + raise ValueError(msg) + columns = response["data"][0] + else: + cols_meta = response["meta"]["query"]["fields"] + columns = [col_meta["field_name"] for col_meta in cols_meta] + return columns + + @classmethod + def _get_col_names_other(cls, response: dict) -> list[str]: + """Get column names from non-Google Analytics data. + + This method extracts the column names from the JSON response received + from an API call that is not related to Google Analytics. + + Args: + ---- + response (dict): + A dictionary containing the JSON response from the API call. + + Returns: + ------- + list of str: + A list of column names extracted from the non-Google Analytics data. + + """ + cols_meta = response["meta"]["query"]["fields"] + return [col_meta["field_name"] for col_meta in cols_meta] + + def _get_col_names(self) -> list[str]: + """Get column names based on the data type. + + This method determines the appropriate column names for the data based + on its type, whether it's Google Analytics data or another type. + + Returns: + ------- + list of str: + A list of column names based on the data type. + + Raises: + ------ + ValueError: + Raised if the column names cannot be determined. + + """ + response: dict = self.to_json() + if self.query_params["ds_id"] == "GA": + return Supermetrics._get_col_names_google_analytics(response) + + return Supermetrics._get_col_names_other(response) + + @add_viadot_metadata_columns + def to_df( + self, + if_empty: str = "warn", + query_params: dict[str, Any] | None = None, + ) -> pd.DataFrame: + """Download data into a pandas DataFrame. + + This method retrieves data from the Supermetrics API and loads it into + a pandas DataFrame. + + Args: + ---- + if_empty (str, optional): + Specifies the action to take if the query returns no data. + Options include "fail" to raise an error or "ignore" to return + an empty DataFrame. Defaults to "fail". + + Returns: + ------- + pd.DataFrame: + A pandas DataFrame containing the JSON data retrieved from the API. + + Raises: + ------ + ValueError: + Raised if the DataFrame is empty and `if_empty` is set to "fail". + + """ + # Use provided query_params or default to the instance's query_params + if query_params is not None: + self.query_params = query_params + + if not self.query_params: + msg = "Query parameters are required to fetch data." + raise ValueError(msg) + + self.query_params["api_key"] = self.api_key + + try: + columns = self._get_col_names() + except ValueError: + columns = None + + data = self.to_json()["data"] + + if data: + df = pd.DataFrame(data[1:], columns=columns).replace("", np.nan) + else: + df = pd.DataFrame(columns=columns) + + if df.empty: + self._handle_if_empty(if_empty) + + return df diff --git a/tests/integration/orchestration/prefect/flows/test_supermetrics.py b/tests/integration/orchestration/prefect/flows/test_supermetrics.py new file mode 100644 index 000000000..fb59da86e --- /dev/null +++ b/tests/integration/orchestration/prefect/flows/test_supermetrics.py @@ -0,0 +1,37 @@ +import pytest + +from viadot.config import get_source_config +from viadot.orchestration.prefect.flows import supermetrics_to_adls + + +@pytest.mark.parametrize( + ("supermetrics_config_key", "adls_credentials_secret"), + [ + ("supermetrics", "supermetrics"), + ], +) +def test_supermetrics_to_adls(supermetrics_config_key, adls_credentials_secret): + supermetrics_config = get_source_config(supermetrics_config_key) + google_ads_params = { + "ds_id": "AW", + "ds_user": supermetrics_config["credentials"].get("user"), + "ds_accounts": ["1007802423"], + "date_range_type": "last_month", + "fields": [ + "Date", + "Campaignname", + "Clicks", + ], + "max_rows": 1, + } + + state = supermetrics_to_adls( + query_params=google_ads_params, + supermetrics_config_key=supermetrics_config_key, + adls_credentials_secret=adls_credentials_secret, + overwrite=True, + adls_path="raw/supermetrics/.parquet", + ) + + all_successful = all(s.type == "COMPLETED" for s in state) + assert all_successful, "Not all tasks in the flow completed successfully." diff --git a/tests/unit/test_supermetrics.py b/tests/unit/test_supermetrics.py new file mode 100644 index 000000000..0cfa48cd8 --- /dev/null +++ b/tests/unit/test_supermetrics.py @@ -0,0 +1,185 @@ +import pytest + +from viadot.sources import Supermetrics, SupermetricsCredentials + + +@pytest.fixture +def supermetrics_credentials(): + return SupermetricsCredentials( + user="test_user", + api_key="test_key", # pragma: allowlist secret + ) + + +@pytest.fixture +def mock_get_source_credentials( + mocker, + supermetrics_credentials: SupermetricsCredentials, +): + return mocker.patch( + "viadot.config.get_source_credentials", + return_value={ + "user": supermetrics_credentials.user, + "api_key": supermetrics_credentials.api_key, + }, + ) + + +@pytest.fixture +def supermetrics(supermetrics_credentials: SupermetricsCredentials): + return Supermetrics( + credentials={ + "user": supermetrics_credentials.user, + "api_key": supermetrics_credentials.api_key, + }, + query_params={"ds_id": "GA", "query": "test_query"}, + ) + + +def test_to_json(mocker, supermetrics: Supermetrics): + # Mock the handle_api_response function to simulate an API response + mock_handle_api_response = mocker.patch( + "viadot.sources.supermetrics.handle_api_response", + ) + mock_response = { + "data": [["value1", "value2"]], + "meta": {"query": {"fields": [{"field_name": "col1"}, {"field_name": "col2"}]}}, + } + # Set the mock to return the mock response object + mock_handle_api_response.return_value.json.return_value = mock_response + + # Call the method under test + response = supermetrics.to_json() + + # Assert that the response is as expected + assert response == { + "data": [["value1", "value2"]], + "meta": {"query": {"fields": [{"field_name": "col1"}, {"field_name": "col2"}]}}, + } + + +def test_to_df_with_data(supermetrics: Supermetrics, mocker): + # Mock the API response with some data + mock_response = { + "meta": { + "query": { + "ds_id": "GA", # Data source ID, e.g., Google Analytics + "fields": [ + { + "field_name": "date", + "field_type": "DIMENSION", + "field_split": "row", + }, + { + "field_name": "sessions", + "field_type": "METRIC", + "field_split": "row", + }, + ], + "other_query_metadata": "...", + }, + "status": "success", # Status of the query + "execution_time": "0.456", # Time taken to execute the query + }, + "data": [ + ["2023-01-01", 100], # Example data rows + ["2023-01-02", 150], + ["2023-01-03", 120], + ], + "paging": { + "current_page": 1, # Current page number if pagination is used + "total_pages": 1, + "total_results": 3, + }, + } + + mock_method = mocker.patch("viadot.sources.supermetrics.Supermetrics.to_json") + mock_method.return_value = mock_response + mock_method = mocker.patch( + "viadot.sources.supermetrics.Supermetrics._get_col_names", + ) + mock_method.return_value = ["date", "sessions"] + df = supermetrics.to_df() + + assert not df.empty + assert list(df.columns) == [ + "date", + "sessions", + "_viadot_source", + "_viadot_downloaded_at_utc", + ] + + +def test_get_col_names_google_analytics_pivoted(supermetrics: Supermetrics): + mock_response = { + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "column"}, + {"field_name": "ga:sessions", "field_split": "row"}, + ], + }, + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], + } + columns = supermetrics._get_col_names_google_analytics(mock_response) + assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100} + + +def test_get_col_names_google_analytics_non_pivoted(supermetrics: Supermetrics): + mock_response = { + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "row"}, + {"field_name": "ga:sessions", "field_split": "row"}, + ], + }, + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], + } + columns = supermetrics._get_col_names_google_analytics(mock_response) + assert columns == ["ga:date", "ga:sessions"] + + +def test_to_df_metadata_columns(mocker, supermetrics: Supermetrics): + # Mock the API response with some data + mock_response = { + "data": [["2023-01-01", 100]], + "meta": { + "query": {"fields": [{"field_name": "date"}, {"field_name": "sessions"}]}, + }, + } + + mocker.patch( + "viadot.sources.supermetrics.Supermetrics.to_json", + return_value=mock_response, + ) + mocker.patch( + "viadot.sources.supermetrics.Supermetrics._get_col_names", + return_value=["date", "sessions"], + ) + + df = supermetrics.to_df() + + assert "_viadot_source" in df.columns + assert "_viadot_downloaded_at_utc" in df.columns + + +def test_get_col_names_ga(mocker, supermetrics: Supermetrics): + mocker.patch( + "viadot.sources.supermetrics.Supermetrics.to_json", + return_value={ + "meta": { + "query": { + "fields": [ + {"field_name": "ga:date", "field_split": "column"}, + {"field_name": "ga:sessions", "field_split": "row"}, + ], + }, + }, + "data": [{"ga:date": "2023-01-01", "ga:sessions": 100}], + }, + ) + columns = supermetrics._get_col_names() + assert columns == {"ga:date": "2023-01-01", "ga:sessions": 100}