diff --git a/dataprep/eda/correlation/compute/overview.py b/dataprep/eda/correlation/compute/overview.py index a8788217b..9a0d8a078 100644 --- a/dataprep/eda/correlation/compute/overview.py +++ b/dataprep/eda/correlation/compute/overview.py @@ -3,7 +3,7 @@ Currently this boils down to pandas' implementation.""" from functools import partial -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union import dask import dask.array as da @@ -12,6 +12,7 @@ from ...configs import Config from ...data_array import DataArray, DataFrame +from ...eda_frame import EDAFrame from ...intermediate import Intermediate from ...utils import cut_long_name from .common import CorrelationMethod @@ -151,7 +152,7 @@ def _calc_overview( def correlation_nxn( - df: DataArray, cfg: Config + df: Union[DataArray, EDAFrame], cfg: Config ) -> Tuple[np.ndarray, np.ndarray, Dict[CorrelationMethod, da.Array]]: """ Calculation of a n x n correlation matrix for n columns @@ -177,7 +178,7 @@ def correlation_nxn( return cordx, cordy, corrs -def _pearson_nxn(df: DataArray) -> da.Array: +def _pearson_nxn(df: Union[DataArray, EDAFrame]) -> da.Array: """Calculate column-wise pearson correlation.""" return ( df.frame.repartition(npartitions=1) @@ -186,7 +187,7 @@ def _pearson_nxn(df: DataArray) -> da.Array: ) -def _spearman_nxn(df: DataArray) -> da.Array: +def _spearman_nxn(df: Union[DataArray, EDAFrame]) -> da.Array: """Calculate column-wise spearman correlation.""" return ( df.frame.repartition(npartitions=1) @@ -195,7 +196,7 @@ def _spearman_nxn(df: DataArray) -> da.Array: ) -def _kendall_tau_nxn(df: DataArray) -> da.Array: +def _kendall_tau_nxn(df: Union[DataArray, EDAFrame]) -> da.Array: """Calculate column-wise kendalltau correlation.""" return ( df.frame.repartition(npartitions=1) diff --git a/dataprep/eda/create_report/__init__.py b/dataprep/eda/create_report/__init__.py index e2de6d5f3..593608691 100644 --- a/dataprep/eda/create_report/__init__.py +++ b/dataprep/eda/create_report/__init__.py @@ -60,7 +60,7 @@ def create_report( >>> report.save('My Fantastic Report') # save report to local disk >>> report.show_browser() # show report in the browser """ - suppress_warnings() + _suppress_warnings() cfg = Config.from_dict(display, config) context = { "resources": INLINE.render(), @@ -72,7 +72,7 @@ def create_report( return Report(report) -def suppress_warnings() -> None: +def _suppress_warnings() -> None: """ suppress warnings in create_report """ diff --git a/dataprep/eda/create_report/formatter.py b/dataprep/eda/create_report/formatter.py index 765c32757..f0a3456f2 100644 --- a/dataprep/eda/create_report/formatter.py +++ b/dataprep/eda/create_report/formatter.py @@ -12,7 +12,6 @@ from ..configs import Config from ..correlation import render_correlation from ..correlation.compute.overview import correlation_nxn -from ..data_array import DataArray from ..distribution import render from ..utils import _calc_line_dt from ..distribution.compute.overview import calc_stats @@ -26,20 +25,19 @@ _format_ov_ins, _insight_pagination, ) -from ..dtypes import ( +from ..dtypes_v2 import ( Continuous, DateTime, Nominal, GeoGraphy, GeoPoint, - detect_dtype, - is_dtype, + SmallCardNum, ) +from ..eda_frame import EDAFrame from ..intermediate import Intermediate from ..missing import render_missing from ..missing.compute.nullivariate import compute_missing_nullivariate from ...progress_bar import ProgressBar -from ..utils import preprocess_dataframe def format_report( @@ -71,7 +69,8 @@ def format_report( """ with ProgressBar(minimum=1, disable=not progress): if mode == "basic": - comps = format_basic(df, cfg) + edaframe = EDAFrame(df) + comps = format_basic(edaframe, cfg) # elif mode == "full": # comps = format_full(df) # elif mode == "minimal": @@ -81,7 +80,7 @@ def format_report( return comps -def format_basic(df: dd.DataFrame, cfg: Config) -> Dict[str, Any]: +def format_basic(df: EDAFrame, cfg: Config) -> Dict[str, Any]: """ Format basic version. @@ -101,13 +100,11 @@ def format_basic(df: dd.DataFrame, cfg: Config) -> Dict[str, Any]: # pylint: disable=too-many-locals,too-many-statements,too-many-branches # aggregate all computations - df_num = DataArray(df).select_num_columns() - df = preprocess_dataframe(df) setattr(getattr(cfg, "plot"), "report", True) if cfg.missingvalues.enable: - data, completions = basic_computations(df, df_num, cfg) + data, completions = basic_computations(df, cfg) else: - data = basic_computations(df, df_num, cfg) + data = basic_computations(df, cfg) with catch_warnings(): filterwarnings( "ignore", @@ -127,13 +124,9 @@ def format_basic(df: dd.DataFrame, cfg: Config) -> Dict[str, Any]: # insight all_ins = _format_ov_ins(data["ov"], cfg) for col, dtp, dat in data["insights"]: - if is_dtype(dtp, Continuous()): + if isinstance(dtp, Continuous): ins = _format_cont_ins(col, dat, data["ov"]["nrows"], cfg)[1] - elif is_dtype(dtp, Nominal()): - ins = _format_nom_ins(col, dat, data["ov"]["nrows"], cfg)[1] - elif is_dtype(dtp, GeoGraphy()): - ins = _format_nom_ins(col, dat, data["ov"]["nrows"], cfg)[1] - elif is_dtype(dtp, GeoPoint()): + elif type(dtp) in [Nominal, SmallCardNum, GeoGraphy, GeoPoint]: ins = _format_nom_ins(col, dat, data["ov"]["nrows"], cfg)[1] else: continue @@ -150,25 +143,16 @@ def format_basic(df: dd.DataFrame, cfg: Config) -> Dict[str, Any]: res["has_variables"] = True for col in df.columns: stats: Any = None # needed for pylint - if is_dtype(detect_dtype(df[col]), Continuous()): + dtp = df.get_dtype(col) + if isinstance(dtp, Continuous): itmdt = Intermediate(col=col, data=data[col], visual_type="numerical_column") stats = format_num_stats(data[col]) - elif is_dtype(detect_dtype(df[col]), Nominal()): - itmdt = Intermediate(col=col, data=data[col], visual_type="categorical_column") - stats = format_cat_stats( - data[col]["stats"], data[col]["len_stats"], data[col]["letter_stats"] - ) - elif is_dtype(detect_dtype(df[col]), GeoGraphy()): - itmdt = Intermediate(col=col, data=data[col], visual_type="categorical_column") - stats = format_cat_stats( - data[col]["stats"], data[col]["len_stats"], data[col]["letter_stats"] - ) - elif is_dtype(detect_dtype(df[col]), GeoPoint()): + elif type(dtp) in [Nominal, SmallCardNum, GeoGraphy, GeoPoint]: itmdt = Intermediate(col=col, data=data[col], visual_type="categorical_column") stats = format_cat_stats( data[col]["stats"], data[col]["len_stats"], data[col]["letter_stats"] ) - elif is_dtype(detect_dtype(df[col]), DateTime()): + elif isinstance(dtp, DateTime): itmdt = Intermediate( col=col, data=data[col]["stats"], @@ -176,6 +160,9 @@ def format_basic(df: dd.DataFrame, cfg: Config) -> Dict[str, Any]: visual_type="datetime_column", ) stats = stats_viz_dt(data[col]["stats"]) + else: + raise RuntimeError(f"the type of column {col} is unknown: {type(dtp)}") + rndrd = render(itmdt, cfg) layout = rndrd["layout"] figs_var: List[Figure] = [] @@ -252,14 +239,14 @@ def format_basic(df: dd.DataFrame, cfg: Config) -> Dict[str, Any]: res["missing"] = components(figs_missing) res["missing_tabs"] = ["Bar Chart", "Spectrum", "Heat Map"] # only display dendrogram when df has more than one column - if dask.compute(df.shape[1])[0] > 1: + if df.shape[1] > 1: res["missing_tabs"].append("Dendogram") return res def basic_computations( - df: dd.DataFrame, df_num: DataArray, cfg: Config + df: EDAFrame, cfg: Config ) -> Union[Tuple[Dict[str, Any], Dict[str, Any]], Any]: """Computations for the basic version. @@ -278,58 +265,54 @@ def basic_computations( Without user's specifications, the default is "auto" """ # pylint: disable=too-many-branches data: Dict[str, Any] = {} - df = DataArray(df) + df_num = df.select_num_columns() data["num_cols"] = df_num.columns - first_rows = df.head + head: pd.DataFrame = df.head() # variables if cfg.variables.enable: for col in df.columns: - npres = dask.compute(df.frame[col].dropna().shape[0]) + dtype = df.get_dtype(col) # Since it will throw error if a numerical column is all-nan, - # we transform it to categorical column - if npres[0] == 0: - df.frame[col] = df.frame[col].astype(str) - data[col] = nom_comps(df.frame[col], df.frame[col].head(), cfg) - elif is_dtype(detect_dtype(df.frame[col]), Nominal()): - data[col] = nom_comps(df.frame[col], first_rows[col], cfg) - elif is_dtype(detect_dtype(df.frame[col]), GeoGraphy()): - data[col] = nom_comps(df.frame[col], first_rows[col], cfg) - elif is_dtype(detect_dtype(df.frame[col]), GeoPoint()): - data[col] = nom_comps(df.frame[col], first_rows[col], cfg) - elif is_dtype(detect_dtype(df.frame[col]), Continuous()): + # we transform it to categorical column. + # We also transform to categorical for small cardinality numerical column. + if df.get_missing_cnt(col) == df.shape[0]: + srs = df.get_col_as_str(col, na_as_str=True) + data[col] = nom_comps(srs, srs.head(), cfg) + elif isinstance(dtype, Nominal): + data[col] = nom_comps(df.frame[col], head[col], cfg) + elif isinstance(dtype, SmallCardNum): + srs = df.get_col_as_str(col, na_as_str=False) + data[col] = nom_comps(srs, srs.head(), cfg) + elif isinstance(dtype, GeoGraphy): + data[col] = nom_comps(df.frame[col], head[col], cfg) + elif isinstance(dtype, GeoPoint): + data[col] = nom_comps(df.frame[col], head[col], cfg) + elif isinstance(dtype, Continuous): data[col] = cont_comps(df.frame[col], cfg) - elif is_dtype(detect_dtype(df.frame[col]), DateTime()): + elif isinstance(dtype, DateTime): data[col] = {} data[col]["stats"] = calc_stats_dt(df.frame[col]) data[col]["line"] = dask.delayed(_calc_line_dt)(df.frame[[col]], "auto") # overview if cfg.overview.enable: data["ov"] = calc_stats(df.frame, cfg, None) - head: pd.DataFrame = df.head data["insights"] = [] for col in df.columns: - col_dtype = detect_dtype(df.frame[col]) - if is_dtype(col_dtype, Continuous()): + col_dtype = df.get_dtype(col) + if isinstance(col_dtype, Continuous): data["insights"].append( (col, Continuous(), _cont_calcs(df.frame[col].dropna(), cfg)) ) - elif is_dtype(col_dtype, Nominal()): - data["insights"].append( - (col, Nominal(), _nom_calcs(df.frame[col].dropna(), head[col], cfg)) - ) - elif is_dtype(col_dtype, GeoGraphy()): - data["insights"].append( - (col, Nominal(), _nom_calcs(df.frame[col].dropna(), head[col], cfg)) - ) - elif is_dtype(col_dtype, GeoPoint()): - data["insights"].append( - (col, Nominal(), _nom_calcs(df.frame[col].dropna(), head[col], cfg)) - ) - elif is_dtype(col_dtype, DateTime()): + elif type(col_dtype) in [Nominal, GeoGraphy, GeoPoint, SmallCardNum]: + srs = df.get_col_as_str(col, na_as_str=False).dropna() + data["insights"].append((col, Nominal(), _nom_calcs(srs, head[col], cfg))) + elif isinstance(col_dtype, DateTime): data["insights"].append( (col, DateTime(), dask.delayed(_calc_line_dt)(df.frame[[col]], cfg.line.unit)) ) + else: + raise RuntimeError(f"unprocessed data type: col:{col}, dtype: {type(col_dtype)}") # interactions if cfg.interactions.enable: diff --git a/dataprep/eda/dtypes_v2.py b/dataprep/eda/dtypes_v2.py new file mode 100644 index 000000000..0f78e605f --- /dev/null +++ b/dataprep/eda/dtypes_v2.py @@ -0,0 +1,389 @@ +""" +In this module lives the type tree. +""" +from collections import defaultdict +from typing import Any, DefaultDict, Dict, List, Optional, Tuple, Type, Union + +import dask.dataframe as dd +import numpy as np +import pandas as pd +from ..clean import validate_country, validate_lat_long +from ..errors import UnreachableError + +STRING_PANDAS_DTYPES = [pd.StringDtype] +STRING_DTYPES = STRING_PANDAS_DTYPES + +CATEGORICAL_NUMPY_DTYPES = [np.bool, np.object] +CATEGORICAL_PANDAS_DTYPES = [pd.CategoricalDtype, pd.PeriodDtype] +CATEGORICAL_DTYPES = CATEGORICAL_NUMPY_DTYPES + CATEGORICAL_PANDAS_DTYPES + STRING_DTYPES + +NUMERICAL_NUMPY_DTYPES = [np.number] +NUMERICAL_DTYPES = NUMERICAL_NUMPY_DTYPES + +DATETIME_NUMPY_DTYPES = [np.datetime64] +DATETIME_PANDAS_DTYPES = [pd.DatetimeTZDtype] +DATETIME_DTYPES = DATETIME_NUMPY_DTYPES + DATETIME_PANDAS_DTYPES + +NULL_VALUES = { + float("NaN"), + "#N/A", + "#N/A N/A", + "#NA", + "-1.#IND", + "-1.#QNAN", + "-NaN", + "-nan", + "1.#IND", + "1.#QNAN", + "", + "N/A", + "NA", + "NULL", + "NaN", + "n/a", + "nan", + "null", + "", +} + + +class DType: + """ + Root of Type Tree + """ + + +############## Syntactic DTypes ############## +class Categorical(DType): + """ + Type Categorical + """ + + +class Nominal(Categorical): + """ + Type Nominal, Subtype of Categorical + """ + + +class Ordinal(Categorical): + """ + Type Ordinal, Subtype of Categorical + """ + + +class Numerical(DType): + """ + Type Numerical + """ + + +class Continuous(Numerical): + """ + Type Continuous, Subtype of Numerical + """ + + +class SmallCardNum(Numerical): + """ + Numerical column with small cardinality (distinct values) + """ + + +class Discrete(Numerical): + """ + Type Discrete, Subtype of Numerical + """ + + +############## Semantic DTypes ############## + + +class DateTime(Numerical): + """ + Type DateTime, Subtype of Numerical + """ + + +class Text(Nominal): + """ + Type Text, Subtype of Nominal + """ + + +class GeoGraphy(Categorical): + """ + Type GeoGraphy, Subtype of Categorical + """ + + +class GeoPoint(DType): + """ + Type GeoPoint + """ + + +class LatLong(GeoPoint): + """ + Type LatLong, Tuple + """ + + def __init__(self, lat_col: str, long_col: str) -> None: + self.lat = lat_col + self.long = long_col + + +############## End of the Type Tree ############## + +DTypeOrStr = Union[DType, Type[DType], str, None] +DTypeDict = Union[Dict[str, Union[DType, Type[DType], str]], None] +DTypeDef = Union[Dict[str, Union[DType, Type[DType], str]], DType, Type[DType], None] + + +def detect_dtype( + col: Union[dd.Series, pd.Series], + head: pd.Series, + known_dtype: Optional[DTypeDef] = None, +) -> DType: + """ + Given a column, detect its type or transform its type according to users' specification + + Parameters + ---------- + col: dask.datafram.Series or pd.Series + A dataframe column + head: pd.Series + The first n rows of col. Used for type inference. + known_dtype: Optional[Union[Dict[str, Union[DType, str]], DType]], default None + A dictionary or single DType given by users to specify the types for designated columns or + all columns. E.g. known_dtype = {"a": Continuous, "b": "Nominal"} or + known_dtype = {"a": Continuous(), "b": "nominal"} or + known_dtype = Continuous() or known_dtype = "Continuous" or known_dtype = Continuous() + detect_small_distinct: bool, default True + Whether to detect numerical columns with small distinct values as categorical column. + """ + if not known_dtype: + return detect_without_known(col, head) + + if isinstance(known_dtype, dict): + if col.name in known_dtype: + dtype = normalize_dtype(known_dtype[col.name]) + return map_dtype(dtype) + + elif isinstance(normalize_dtype(known_dtype), DType): + return map_dtype(normalize_dtype(known_dtype)) + + return detect_without_known(col, head) + + +def map_dtype(dtype: DType) -> DType: + """ + Currently, we want to keep our Type System flattened. + We will map Categorical() to Nominal() and Numerical() to Continuous() + """ + if ( + isinstance(dtype, Categorical) is True + and isinstance(dtype, Ordinal) is False + and isinstance(dtype, Nominal) is False + ): + return Nominal() + elif ( + isinstance(dtype, Numerical) is True + and isinstance(dtype, Continuous) is False + and isinstance(dtype, Discrete) is False + ): + return Continuous() + else: + return dtype + + +def detect_without_known(col: Union[dd.Series, pd.Series], head: pd.Series) -> DType: + # pylint: disable=too-many-return-statements + """ + This function detects dtypes of column when users didn't specify. + """ + if is_nominal(col.dtype): + if is_geography(head): + return GeoGraphy() + if is_geopoint(head): + return GeoPoint() + else: + return Nominal() + + elif is_continuous(col.dtype): + # detect as categorical if distinct value is small + if isinstance(col, dd.Series): + nuniques = col.nunique_approx().compute() + elif isinstance(col, pd.Series): + nuniques = col.nunique() + else: + raise TypeError(f"unprocessed column type:{type(col)}") + if nuniques < 10: + return SmallCardNum() + else: + return Continuous() + + elif is_datetime(col.dtype): + return DateTime() + else: + raise UnreachableError + + +def is_dtype(dtype1: Any, dtype2: DType) -> bool: + """ + This function detects if dtype2 is dtype1. + """ + return isinstance(dtype1, dtype2.__class__) + + +def normalize_dtype(dtype_repr: Any) -> DType: + """ + This function normalizes a dtype repr. + """ + normalized: DType + str_dic = { + "Categorical": Categorical, + "Ordinal": Ordinal, + "Nominal": Nominal, + "Numerical": Numerical, + "Continuous": Continuous, + "Discrete": Discrete, + "DateTime": DateTime, + "Text": Text, + } + for str_dtype, dtype in str_dic.items(): + if isinstance(dtype_repr, str): + if dtype_repr.lower() == str_dtype.lower(): + normalized = dtype() + break + + elif isinstance(dtype_repr, dtype): + normalized = dtype_repr + break + + elif dtype_repr == dtype: + normalized = dtype() + break + + return normalized + + +def is_nominal(dtype: Any) -> bool: + """ + Given a type, return if that type is a nominal type + """ + + if is_continuous(dtype) or is_datetime(dtype): + return False + + if isinstance(dtype, np.dtype): + dtype = dtype.type + + return any(issubclass(dtype, c) for c in CATEGORICAL_NUMPY_DTYPES) + else: + return any(isinstance(dtype, c) for c in CATEGORICAL_PANDAS_DTYPES) + + +def is_geography(head: pd.Series) -> bool: + """ + Given a column, return if its type is a geography type + """ + geo_ratio: float = np.sum(validate_country(head)) / head.shape[0] + return geo_ratio > 0.8 + + +def is_geopoint(head: pd.Series) -> bool: + """ + Given a column, return if its type is a geopoint type + """ + lat_long = pd.Series(head, dtype="string") + lat_long_ratio: float = np.sum(validate_lat_long(lat_long)) / lat_long.shape[0] + return lat_long_ratio > 0.8 + + +def is_continuous(dtype: Any) -> bool: + """ + Given a type, return if that type is a continuous type + """ + dtype = dtype.type + return any(issubclass(dtype, c) for c in NUMERICAL_NUMPY_DTYPES) + + +def is_datetime(dtype: Any) -> bool: + """ + Given a type, return if that type is a datetime type + """ + if isinstance(dtype, np.dtype): + dtype = dtype.type + return any(issubclass(dtype, c) for c in DATETIME_NUMPY_DTYPES) + else: + return any(isinstance(dtype, c) for c in DATETIME_PANDAS_DTYPES) + + +def is_pandas_categorical(dtype: Any) -> bool: + """ + Detect if a dtype is categorical and from pandas. + """ + return any(isinstance(dtype, c) for c in CATEGORICAL_PANDAS_DTYPES) + + +def string_dtype_to_object(df: dd.DataFrame) -> dd.DataFrame: + """ + Convert string dtype to object dtype + """ + for col in df.columns: + if any(isinstance(df[col].dtype, c) for c in STRING_DTYPES): + df[col] = df[col].astype(object) + + return df + + +def drop_null( + var: Union[dd.Series, pd.DataFrame, dd.DataFrame] +) -> Union[pd.Series, dd.Series, pd.DataFrame, dd.DataFrame]: + """ + Drop the null values (specified in NULL_VALUES) from a series or DataFrame + """ + + if isinstance(var, (pd.Series, dd.Series)): + if is_datetime(var.dtype): + return var.dropna() + return var[~var.isin(NULL_VALUES)] + + elif isinstance(var, (pd.DataFrame, dd.DataFrame)): + df = var + for values in df.columns: + if is_datetime(df[values].dtype): + df = df.dropna(subset=[values]) + else: + df = df[~df[values].isin(NULL_VALUES)] + return df + + raise ValueError("Input should be a Pandas/Dask Dataframe or Series") + + +def get_dtype_cnts_and_num_cols( + df: dd.DataFrame, + dtype: Union[Dict[str, Union[DType, Type[DType], str]], DType, Type[DType], None], +) -> Tuple[Dict[str, int], List[str]]: + """ + Get the count of each dtype in a dataframe + """ + dtype_cnts: DefaultDict[str, int] = defaultdict(int) + num_cols: List[str] = [] + for col in df.columns: + col_dtype = detect_dtype(df[col], dtype) + if is_dtype(col_dtype, Nominal()): + dtype_cnts["Categorical"] += 1 + elif is_dtype(col_dtype, Continuous()): + dtype_cnts["Numerical"] += 1 + num_cols.append(col) + elif is_dtype(col_dtype, DateTime()): + dtype_cnts["DateTime"] += 1 + elif is_dtype(col_dtype, GeoGraphy()): + dtype_cnts["GeoGraphy"] += 1 + elif is_dtype(col_dtype, GeoPoint()): + dtype_cnts["GeoPoint"] += 1 + else: + raise NotImplementedError + return dtype_cnts, num_cols diff --git a/dataprep/eda/eda_frame.py b/dataprep/eda/eda_frame.py new file mode 100644 index 000000000..d96d20ee1 --- /dev/null +++ b/dataprep/eda/eda_frame.py @@ -0,0 +1,367 @@ +"""Defines DataArray.""" + +from functools import reduce +from math import ceil +from typing import Any, List, Optional, Sequence, Tuple, Union, cast, Dict +from collections import Counter +import warnings + +import dask +import dask.array as da +import dask.dataframe as dd +import numpy as np +import pandas as pd +import pandas._libs.missing as libmissing + +from .dtypes_v2 import ( + NUMERICAL_DTYPES, + DType, + DTypeDef, + detect_dtype, + Nominal, +) + +DataFrame = Union[pd.DataFrame, dd.DataFrame, "EDAFrame"] + + +class EDAFrame: + """EDAFrame provides an abstraction over dask DataFrame + and dask Array. The reason is that sometimes some algorithms + only works on the Array and not the DataFrame. However, + the cost for getting the array from a dask DataFrame (with known length) + is non trivial. Instead of computing the array from a dask + DataFrame again and again, it would be better do that once. + + Other reasons to have a separate EDAFrame abstraction includes + converting the column names to string without modifying the + DataFrame from user, and preprocessings like dropna and type detection. + + Parameters + ---------- + df + The DataFrame + value_length + Whether to compute the lengths of the array. + This triggers a read on the data thus expensive if the passed in df + is a dask DataFrame. + If a pandas DataFrame passed in, lengths will always be compute. + repartition + Whether to repartition the DataFrame into 128M chunks. + dtype: str or DType or dict of str or dict of DType, default None + Specify Data Types for designated column or all columns. + E.g. dtype = {"a": Continuous, "b": "Nominal"} or + dtype = {"a": Continuous(), "b": "nominal"} + or dtype = Continuous() or dtype = "Continuous" or dtype = Continuous() + """ + + # pylint: disable=too-many-instance-attributes + _ddf: dd.DataFrame + _values: da.Array + _nulls: Union[da.Array, np.ndarray] + _columns: pd.Index + _eda_dtypes: Dict[str, DType] = {} + _str_col_cache: Dict[Tuple[str, bool], dd.Series] = {} + _nulls_cnt: Dict[str, int] = {} + _head: Optional[pd.DataFrame] = None + _shape: Optional[Tuple[int, int]] = None + + # pylint: disable = too-many-branches + def __init__( + self, + df: Optional[DataFrame] = None, + value_length: bool = False, + repartition: bool = True, + dtype: Optional[DTypeDef] = None, + ) -> None: + + _suppress_warnings() + + if df is None: + return + + if isinstance(df, EDAFrame): + self._ddf = df._ddf + self._values = df._values + self._columns = df._columns + self._nulls = df._nulls + self._nulls_cnt = df._nulls_cnt + self._eda_dtypes = df._eda_dtypes + self._str_col_cache = df._str_col_cache + self._head = df._head + self._shape = df._shape + return + + if isinstance(df, (dd.Series, pd.Series)): + df = df.to_frame() + + # if index is object type, convert it to string + # to make sure the element is comparable. Otherwise it will throw + # error when dask divide and sort data by index. + if df.index.dtype == np.object: + df.index = df.index.astype(str) + + if isinstance(df, dd.DataFrame): + is_pandas = False + ddf = df + elif isinstance(df, pd.DataFrame): + is_pandas = True + if repartition: + df_size = df.memory_usage(deep=True).sum() + npartitions = ceil(df_size / 128 / 1024 / 1024) + ddf = dd.from_pandas(df, npartitions=npartitions) + else: + ddf = dd.from_pandas(df, chunksize=-1) + else: + raise ValueError(f"{type(df)} not supported") + + ddf.columns = _process_column_name(ddf.columns) + ddf = ddf.persist() + self._eda_dtypes = _detect_dtypes(ddf, dtype) + + # Transform categorical column to string for non-na values. + for col in ddf.columns: + if isinstance(self._eda_dtypes[col], Nominal): + ddf[col] = ddf[col].apply(_to_str_if_not_na, meta=(col, "object")) + + self._ddf = ddf.persist() + self._columns = self._ddf.columns + if value_length or is_pandas: + self._values = self._ddf.to_dask_array(lengths=True) + else: + self._values = self._ddf.to_dask_array() + + # compute meta for null values + dd_null = self._ddf.isnull() + self._nulls = dd_null.to_dask_array() + self._nulls._chunks = self.values.chunks + pd_null = dd_null.compute() + nulls_cnt = {} + for col in self._ddf.columns: + nulls_cnt[col] = pd_null[col].sum() + self._nulls_cnt = nulls_cnt + + @property + def columns(self) -> pd.Index: + """Return the columns of the DataFrame.""" + return self._columns + + @property + def dtypes(self) -> pd.Series: + """Returns the dtypes of the DataFrame.""" + return self._ddf.dtypes + + @property + def nulls(self) -> da.Array: + """Return the nullity array of the data.""" + return self._nulls + + @property + def shape(self) -> Tuple[int, int]: + """Return the shape of the data""" + if self._shape is None: + self._shape = cast(Tuple[int, int], self.values.shape) + return self._shape + + @property + def values(self) -> da.Array: + """Return the array representation of the data.""" + return self._values + + @property + def frame(self) -> dd.DataFrame: + """Return the underlying dataframe.""" + return self._ddf + + def head(self, n: int = 5) -> pd.DataFrame: + """Return the head of the DataFrame, if not exist, read it.""" + if self._head is None: + self._head = self.frame.head(n=n) + return self._head + + def get_col_as_str(self, col: str, na_as_str: bool = False) -> dd.Series: + """ + Return the column as string column. + If na_as_str is True, then NA vlaues will also be transformed to str, + otherwise it is kept as NA. + """ + if col not in self._columns: + raise RuntimeError(f"column is not exists: {col}") + + if (col, na_as_str) in self._str_col_cache: + return self._str_col_cache[(col, na_as_str)] + + # The case for directly return + if (isinstance(self._eda_dtypes[col], Nominal)) and ( + (na_as_str and self.get_missing_cnt(col) == 0) or (not na_as_str) + ): + return self._ddf[col] + + if na_as_str: + self._str_col_cache[(col, na_as_str)] = self._ddf[col].astype(str).persist() + else: + self._str_col_cache[(col, na_as_str)] = ( + self._ddf[col].apply(_to_str_if_not_na, meta=(col, "object")).persist() + ) + + return self._str_col_cache[(col, na_as_str)] + + def get_missing_cnt(self, col: str) -> int: + """ + Get the count of missing values for given column. + """ + return self._nulls_cnt[col] + + def get_dtype(self, col: str) -> DType: + """ + Get the infered dtype for the given column. + """ + return self._eda_dtypes[col] + + def compute(self, type: str = "lengths") -> None: # pylint: disable=redefined-builtin + """Compute the lengths or materialize the null values inplace. + + Parameters + ---------- + type + Can be lengths or nulls. lengths will compute the array chunk sizes and nulls + will compute and materialize the null values as well as the lengths of the chunks. + + """ + + # pylint: disable = protected-access + if type == "lengths": + not_computed = np.isnan(self.shape[0]) + if not_computed: + self._values = self.frame.to_dask_array(lengths=True) + self._nulls = self.frame.isnull().to_dask_array() + self._nulls._chunks = self.values.chunks + elif type == "nulls": + x = self.nulls + # Copied from compute_chunk_sizes + # pylint: disable=invalid-name + chunk_shapes = x.map_blocks( + _get_chunk_shape, + dtype=int, + chunks=tuple(len(c) * (1,) for c in x.chunks) + ((x.ndim,),), + new_axis=x.ndim, + ) + + c = [] + for i in range(x.ndim): + s = x.ndim * [0] + [i] + s[i] = slice(None) + s = tuple(s) + + c.append(tuple(chunk_shapes[s])) + + chunks_, nulls = dask.compute(tuple(c), self.nulls) + chunks = tuple([tuple([int(chunk) for chunk in chunks]) for chunks in chunks_]) + self._nulls = nulls + self._values._chunks = chunks + else: + raise ValueError(f"{type} not supported.") + + def select_dtypes(self, include: List[Any]) -> "EDAFrame": + """Return a new DataArray with designated dtype columns.""" + subdf = self._ddf.select_dtypes(include) # pylint: disable=W0212 + return self[subdf.columns] + + def select_num_columns(self) -> "EDAFrame": + """Return a new DataArray with numerical dtype columns.""" + df = self.select_dtypes(NUMERICAL_DTYPES) + return df + + def __getitem__(self, indexer: Union[Sequence[str], str]) -> "EDAFrame": + """Return a new DataArray select by column names.""" + if isinstance(indexer, str): + indexer = [indexer] + + subdf = self._ddf[indexer] # pylint: disable=W0212 + cidx = [self.columns.get_loc(col) for col in subdf.columns] + df = EDAFrame() + df._ddf = subdf + df._columns = subdf.columns + df._values = self.values[:, cidx] # pylint: disable=W0212 + df._nulls = self.nulls[:, cidx] # pylint: disable=W0212 + if self._head is not None: + df._head = self.head()[subdf.columns] # pylint: disable=W0212 + + eda_dtypes: Dict[str, DType] = {} + str_col_cache: Dict[Tuple[str, bool], dd.Series] = {} + nulls_cnt: Dict[str, int] = {} + for col in df._columns: + eda_dtypes[col] = self._eda_dtypes[col] + nulls_cnt[col] = self._nulls_cnt[col] + for val in [True, False]: + if (col, val) in self._str_col_cache: + str_col_cache[(col, val)] = self._str_col_cache[(col, val)] + + df._eda_dtypes = eda_dtypes + df._str_col_cache = str_col_cache + df._nulls_cnt = nulls_cnt + + if df.shape[1] != 0: + # coerce the array to it's minimal type + dtype = reduce(np.promote_types, df.dtypes.values) + if df._values.dtype != dtype: + df._values = df._values.astype(dtype) + + return df + + +def _get_chunk_shape(arr: np.ndarray) -> np.ndarray: + """Given an (x,y,...) N-d array, returns (1,1,...,N) N+1-d array""" + shape = np.asarray(arr.shape, dtype=int) + return shape[len(shape) * (None,) + (slice(None),)] + + +def _process_column_name(df_columns: pd.Index) -> List[str]: + """ + 1. Transform column name to string, + 2. Resolve duplicate names in columns. + Duplicate names will be renamed as col_{id}. + """ + columns = list(map(str, df_columns)) + column_count = Counter(columns) + current_id: Dict[Any, int] = dict() + for i, col in enumerate(columns): + if column_count[col] > 1: + current_id[col] = current_id.get(col, 0) + 1 + new_col_name = f"{col}_{current_id[col]}" + else: + new_col_name = f"{col}" + columns[i] = new_col_name + return columns + + +def _to_str_if_not_na(obj: Any) -> Any: + """ + This function transforms an obj to str if it is not NA. + The check for NA is similar to pd.isna, but will treat a list obj as + a scalar and return a single boolean, rather than a list of booleans. + Otherwise when a cell is tuple or list it will throw an error. + """ + return obj if libmissing.checknull(obj) else str(obj) + + +def _detect_dtypes(df: dd.DataFrame, known_dtype: Optional[DTypeDef] = None) -> Dict[str, DType]: + """ + Return a dict that maps column name to its dtype for each column in given df. + """ + head = df.head(n=100) + res = {} + for col in df.columns: + dtype = detect_dtype(df[col], head[col], known_dtype) + res[col] = dtype + return res + + +def _suppress_warnings() -> None: + """ + suppress warnings + """ + warnings.filterwarnings( + "ignore", + "Insufficient elements for `head`.", + category=UserWarning, + ) diff --git a/dataprep/eda/utils.py b/dataprep/eda/utils.py index 945f37325..36344c373 100644 --- a/dataprep/eda/utils.py +++ b/dataprep/eda/utils.py @@ -114,7 +114,7 @@ def preprocess_dataframe( if (is_dtype(col_dtype, Nominal())) and ( (excluded_columns is None) or (col not in excluded_columns) ): - df[col] = df[col].apply(_notna2str, meta=("object")) + df[col] = df[col].apply(_notna2str, meta=(col, "object")) return df diff --git a/dataprep/tests/eda/random_data_generator.py b/dataprep/tests/eda/random_data_generator.py index 05d8d50a5..7ca825dbb 100644 --- a/dataprep/tests/eda/random_data_generator.py +++ b/dataprep/tests/eda/random_data_generator.py @@ -192,4 +192,12 @@ def gen_random_dataframe( @pytest.fixture(scope="module") # type: ignore def random_df() -> pd.DataFrame: - return gen_random_dataframe() + df1 = gen_random_dataframe(nrows=30, ncols=10, random_state=0).reset_index(drop=True) + df2 = gen_random_dataframe(nrows=30, ncols=10, na_ratio=0.1, random_state=1).reset_index( + drop=True + ) + df3 = gen_constant_series(30, np.nan).to_frame().reset_index(drop=True) + df4 = gen_constant_series(30, "s").to_frame().reset_index(drop=True) + df = pd.concat([df1, df2, df3, df4], axis=1) + df.index = gen_random_series(df.index.shape[0], na_ratio=0.1, str_max_len=100, random_state=2) + return df diff --git a/dataprep/tests/eda/test_create_report.py b/dataprep/tests/eda/test_create_report.py index 2e5825288..2db4ac9cd 100644 --- a/dataprep/tests/eda/test_create_report.py +++ b/dataprep/tests/eda/test_create_report.py @@ -7,6 +7,7 @@ import pytest from ...eda import create_report from ...datasets import load_dataset +from .random_data_generator import random_df LOGGER = logging.getLogger(__name__) @@ -90,8 +91,12 @@ def test_dataset() -> None: dataset_names = ["titanic", "iris"] # dataset_names = get_dataset_names() for dataset in dataset_names: - print(f"testing dataset:{dataset}") + # print(f"testing dataset:{dataset}") df = load_dataset(dataset) # popu_size = df.shape[0] # df = df.sample(n=min(popu_size, 1000), random_state=0) create_report(df) + + +def test_random_df(random_df: pd.DataFrame) -> None: + create_report(random_df)