Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python client changes, more docs, fix pdm #490

Merged
merged 5 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 78 additions & 12 deletions client/python/datajunction/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
"""DataJunction client setup."""
# pylint: disable=redefined-outer-name, import-outside-toplevel
import enum
import logging
import platform
import warnings
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urljoin

import pandas as pd
try:
import pandas as pd
except ImportError:
warnings.warn(
"Optional dependency `pandas` not found, data retrieval disabled",
samredai marked this conversation as resolved.
Show resolved Hide resolved
ImportWarning,
)
import requests
from pydantic import BaseModel, Field, validator
from requests.adapters import CaseInsensitiveDict, HTTPAdapter
Expand Down Expand Up @@ -69,15 +77,19 @@ class DJClient: # pylint: disable=too-many-public-methods
Client for access to the DJ core service
"""

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
uri: str = "http://localhost:8000",
engine_name: str = None,
engine_version: str = None,
requests_session: RequestsSessionWithEndpoint = None,
target_namespace: str = DEFAULT_NAMESPACE,
timeout=2 * 60,
):
self.target_namespace = target_namespace
self.uri = uri
self.engine_name = engine_name
self.engine_version = engine_version
if not requests_session: # pragma: no cover
self._session = RequestsSessionWithEndpoint(
endpoint=self.uri,
Expand Down Expand Up @@ -438,8 +450,8 @@ def sql_for_metric( # pylint: disable=too-many-arguments
params={
"dimensions": dimensions,
"filters": filters,
"engine_name": engine_name,
"engine_version": engine_version,
"engine_name": engine_name or self.engine_name,
"engine_version": engine_version or self.engine_version,
},
)
if response.status_code == 200:
Expand All @@ -463,35 +475,89 @@ def sql( # pylint: disable=too-many-arguments
"metrics": metrics,
"dimensions": dimensions,
"filters": filters,
"engine_name": engine_name,
"engine_version": engine_version,
"engine_name": engine_name or self.engine_name,
"engine_version": engine_version or self.engine_version,
},
)
if response.status_code == 200:
return response.json()["sql"]
return response.json()

def data( # pylint: disable=too-many-arguments
def data_for_single_node( # pylint: disable=too-many-arguments
samredai marked this conversation as resolved.
Show resolved Hide resolved
self,
node_name: str,
dimensions: List[str],
filters: List[str],
engine_name: Optional[str] = "TRINO_DIRECT",
engine_version: Optional[str] = "",
engine_name: Optional[str] = None,
engine_version: Optional[str] = None,
): # pragma: no cover
"""
Retrieves the data for the node with the provided dimensions and filters.
"""
try:
import pandas as pd # noqa: F811
except ImportError as exc:
raise RuntimeError(
"Optional dependency `pandas` not found, data retrieval disabled",
) from exc
response = self._session.get(
f"/data/{node_name}/",
params={
"dimensions": dimensions,
"filters": filters,
"engine_name": engine_name,
"engine_version": engine_version,
"engine_name": engine_name or self.engine_name,
"engine_version": engine_version or self.engine_version,
},
)
results = response.json()
if not response.ok:
raise DJClientException(f"Error retrieving data: {response.text}")
if results["state"] != "FINISHED":
raise DJClientException(
f"Query state {results['state']}, errors: {results['errors']}",
)
if not results["results"]:
raise DJClientException("No data returned for requested set")
columns = results["results"][0]["columns"]
rows = results["results"][0]["rows"]
return pd.DataFrame(rows, columns=[col["name"] for col in columns])

def data( # pylint: disable=too-many-arguments
self,
metrics: List[str],
dimensions: List[str],
filters: List[str],
engine_name: Optional[str] = None,
engine_version: Optional[str] = None,
): # pragma: no cover
"""
Retrieves the data for the node with the provided dimensions and filters.
"""
try:
import pandas as pd # noqa: F811
except ImportError as exc:
raise RuntimeError(
"Optional dependency `pandas` not found, data retrieval disabled",
) from exc
response = self._session.get(
"/data/",
params={
"metrics": metrics,
"dimensions": dimensions,
"filters": filters,
"engine_name": engine_name or self.engine_name,
"engine_version": engine_version or self.engine_version,
},
)
results = response.json()
if not response.ok:
raise DJClientException(f"Error retrieving data: {response.text}")
if results["state"] != "FINISHED":
raise DJClientException(
f"Query state {results['state']}, errors: {results['errors']}",
)
if not results["results"]:
raise DJClientException("No data returned for requested set")
columns = results["results"][0]["columns"]
rows = results["results"][0]["rows"]
return pd.DataFrame(rows, columns=[col["name"] for col in columns])
Expand Down Expand Up @@ -649,7 +715,7 @@ def data(
"""
Gets data for this node, given the provided dimensions and filters.
"""
return self.dj_client.data( # pragma: no cover
return self.dj_client.data_for_single_node( # pragma: no cover
self.name,
dimensions,
filters,
Expand Down
Loading