Skip to content

Commit

Permalink
perf(clean): improve performance of clean_duplication
Browse files Browse the repository at this point in the history
Improves the performance of the clean_duplication function.
  • Loading branch information
ryanwdale committed Apr 13, 2021
1 parent 5533cf5 commit 8fda37e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 50 deletions.
15 changes: 6 additions & 9 deletions dataprep/clean/clean_duplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ipywidgets.widgets import Label, Dropdown, Checkbox, Button, HBox, VBox, Box, Layout, Text
import pandas as pd
import dask.dataframe as dd

from .clean_duplication_utils import Clusterer

Expand All @@ -15,7 +16,7 @@


def clean_duplication(
df: pd.DataFrame, column: str, df_var_name: str = "df", page_size: int = 5
df: Union[pd.DataFrame, dd.DataFrame], column: str, df_var_name: str = "df", page_size: int = 5
) -> Box:
"""
Cleans and standardized duplicate values in a DataFrame.
Expand All @@ -25,7 +26,7 @@ def clean_duplication(
Parameters
----------
df
A pandas DataFrame containing the data to be cleaned.
A pandas or Dask DataFrame containing the data to be cleaned.
column
The name of the column containing duplicate values.
df_var_name
Expand Down Expand Up @@ -207,9 +208,7 @@ def _update_clusters(self) -> None:
line = HBox(children=[Label("-" * 186, layout=Layout(margin="0 0 0 18px"))])
self._sel_all.value = False

cluster_page = self._clusterer.clusters.iloc[
self._page_pos : self._page_pos + self._page_size
]
cluster_page = self._clusterer.get_page(self._page_pos, self._page_pos + self._page_size)

label_layout = Layout(height="22px", width="360px")
box_children = [line]
Expand Down Expand Up @@ -240,7 +239,7 @@ def _update_clusters(self) -> None:
box_children.append(line)

# no clusters to display
if len(box_children) == 1:
if len(cluster_page) == 0:
box_children = [
Label(
"No clusters, try a different clustering method",
Expand Down Expand Up @@ -404,9 +403,7 @@ def _execute_merge(self, _: Dict[str, Any]) -> None:

do_merge = [check.value for check in self._checks]
new_values = [text.value for text in self._reprs]
cluster_page = self._clusterer.clusters.iloc[
self._page_pos : self._page_pos + self._page_size
]
cluster_page = self._clusterer.get_page(self._page_pos, self._page_pos + self._page_size)

if self._export_code.value:
self._clusterer.live_export_code(cluster_page, do_merge, new_values)
Expand Down
96 changes: 55 additions & 41 deletions dataprep/clean/clean_duplication_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
from base64 import b64encode
from collections import defaultdict
from operator import itemgetter
from typing import List, Set, DefaultDict
from typing import List, Set, Union, DefaultDict
from itertools import permutations

import pandas as pd
import dask.dataframe as dd
import dask
from IPython.display import Javascript, display
from metaphone import doublemetaphone
from Levenshtein import distance

from .utils import to_dask

DECODE_FUNC = """
function b64DecodeUnicode(str) {
// Going backwards: from bytestream, to percent-encoding, to original string.
Expand All @@ -30,19 +34,23 @@ class Clusterer:
Performs clustering methods on data.
"""

# pylint: disable=too-many-instance-attributes

clusters: pd.Series
_df: pd.DataFrame
_df: dd.DataFrame
_counts: pd.Series
_df_name: str
_col: str
_ngram: int
_radius: int
_block_size: int

def __init__(self, df: pd.DataFrame, col_name: str, df_name: str):
self.clusters = pd.Series()
self._df = df.copy(deep=True)
def __init__(self, df: Union[pd.DataFrame, dd.DataFrame], col: str, df_name: str):
self.clusters = pd.Series(dtype=object)
self._df = to_dask(df)
self._counts = pd.Series(dtype=object)
self._df_name = df_name
self._col = col_name
self._col = col
self._ngram = 2
self._radius = 2
self._block_size = 6
Expand All @@ -52,6 +60,7 @@ def cluster(self, cluster_method: str) -> None:
"""
Create clusters using the given clustering method.
"""

if cluster_method == "levenshtein":
self._nearest_neighbours_cluster()
else:
Expand All @@ -60,30 +69,27 @@ def cluster(self, cluster_method: str) -> None:
def _key_collision_cluster(self, cluster_method: str) -> None:
"""
Create clusters using a key collision method.
Clusters are a Pandas Series of lists (each list represents a cluster),
each list contains tuples with the form (item, count).
Clusters are a Pandas Series of lists (each list represents a cluster).
"""
key_funcs = {
"fingerprint": self._finger_print_key,
"ngram-fingerprint": self._ngram_finger_print_key,
"phonetic-fingerprint": self._phonetic_fingerprint_key,
}
key_func = key_funcs[cluster_method]
col = self._col
# get the count of each item in the dataframe and remove duplicates
counts = self._df[col].value_counts(sort=False)
no_dups = self._df.drop_duplicates(subset=[col]).reset_index()
# create a column "vals" containing tuples of the form (item, count)
no_dups.loc[:, "vals"] = no_dups[col].map(lambda val: (val, counts.loc[val]))
counts = self._df[self._col].value_counts(sort=False)
# create dataframe containing unique values
df = counts.index.to_frame(name=self._col)
# create a column "key" containing keys created by the given key collision method
no_dups.loc[:, "key"] = no_dups[col].map(key_func)
df["key"] = df[self._col].map(key_func)
# put items with the same key into the same list
clusters = no_dups.groupby("key")["vals"].agg(list)
clusters = df.groupby("key")[self._col].apply(list, meta=(self._col, "object"))
clusters = clusters.loc[clusters.map(len) > 1]
clusters, self._counts = dask.compute(clusters, counts)
# sort by the size of each cluster, so that larger clusters appear first
clusters = clusters.sort_values(key=lambda x: x.map(len), ascending=False)
# values with greater counts appear first in the cluster
self.clusters = clusters.map(lambda x: sorted(x, key=itemgetter(1), reverse=True))
self.clusters = clusters.sort_values(key=lambda x: x.map(len), ascending=False).reset_index(
drop=True
)

def _nearest_neighbours_cluster(self) -> None:
"""
Expand All @@ -95,15 +101,18 @@ def _nearest_neighbours_cluster(self) -> None:
Method from OpenRefine: https://github.com/OpenRefine/OpenRefine/wiki/Clustering-In-Depth
and simile-vicino: https://code.google.com/archive/p/simile-vicino/
"""
col = self._col
blocks: DefaultDict[str, Set[str]] = defaultdict(set)
counts = self._df[col].value_counts(sort=False)
no_dups = self._df.drop_duplicates(subset=[col]).reset_index()

counts = self._df[self._col].value_counts(sort=False)
# create dataframe containing unique values
df = counts.index.to_frame(name=self._col)
# put strings in blocks
no_dups[col].apply(self._populate_blocks, args=(blocks, self._block_size))
populate_blocks = df[self._col].apply(
self._populate_blocks, args=(blocks, self._block_size), meta=(self._col, "object")
)
_, self._counts = dask.compute(populate_blocks, counts)

# compare strings in the same block and create clusters
self.clusters = self._get_nearest_neighbour_clusters(blocks, counts, self._radius)
self.clusters = self._get_nearest_neighbour_clusters(blocks, self._radius)

@staticmethod
def _populate_blocks(val: str, blocks: DefaultDict[str, Set[str]], block_size: int) -> None:
Expand All @@ -117,7 +126,7 @@ def _populate_blocks(val: str, blocks: DefaultDict[str, Set[str]], block_size: i

@staticmethod
def _get_nearest_neighbour_clusters(
blocks: DefaultDict[str, Set[str]], counts: pd.Series, radius: int
blocks: DefaultDict[str, Set[str]], radius: int
) -> pd.Series:
"""
Compare every pair of strings in each block and add to cluster if
Expand All @@ -134,17 +143,13 @@ def _get_nearest_neighbour_clusters(
if dist <= radius or radius < 0:
cluster_map[center].add(val)

# remove duplicate clusters and sort so that values with greater counts
# appear first in the cluster.
# remove duplicate clusters and clusters of length 1
unique_clusters = set(
tuple(sorted(cluster, key=lambda x: counts.loc[x], reverse=True))
for cluster in cluster_map.values()
frozenset(cluster) for cluster in cluster_map.values() if len(cluster) > 1
)

clusters = [
[(x, counts.loc[x]) for x in cluster] for cluster in unique_clusters if len(cluster) > 1
]

# convert to list of lists
clusters = [list(cluster) for cluster in unique_clusters]
# sort by the size of each cluster, so that larger clusters appear first
return pd.Series(sorted(clusters, key=len, reverse=True))

@staticmethod
Expand Down Expand Up @@ -258,7 +263,7 @@ def execute_merge_code(
for idx, cluster in enumerate(cluster_page):
cluster_repr = new_values[idx]
if do_merge[idx]:
self._df.loc[:, self._col] = self._df[self._col].replace(
self._df[self._col] = self._df[self._col].replace(
[cluster_val for cluster_val, _ in cluster], cluster_repr
)

Expand All @@ -268,7 +273,8 @@ def final_df(self) -> None:
"""
code = "# dataframe with cleaned string values\ndf_clean"
encoded_code = (b64encode(str.encode(code))).decode()
json = self._df.to_json(force_ascii=False)
final_df = self._df.compute()
json = final_df.to_json(force_ascii=False)
execute_code = f"import pandas as pd\ndf_clean = pd.read_json('{json}')"
encoded_execute = (b64encode(str.encode(execute_code))).decode()
code = """
Expand All @@ -282,6 +288,17 @@ def final_df(self) -> None:
)
display(Javascript(code))

def get_page(self, start: int, end: int) -> pd.Series:
"""
Returns a page of clusters from start to end. Adds the counts to
each cluster entry and sorts each cluster by the counts, so that values
with a greater count appear first in the cluster.
"""
page = self.clusters.iloc[start:end]
page = page.map(lambda cluster: [(val, self._counts.loc[val]) for val in cluster])
page = page.map(lambda x: sorted(x, key=itemgetter(1), reverse=True))
return page

def set_cluster_params(self, ngram: int, radius: int, block_size: int) -> None:
"""
Set clustering parameters.
Expand All @@ -302,10 +319,7 @@ def _ngram_tokens(val: str, n: int) -> List[str]:
# remove control characters
val = "".join(ch for ch in val if category(ch)[0] != "C")
val = normalize_non_ascii(val)
n_grams = []
for i in range(len(val) - n + 1):
n_grams.append(val[i : i + n])
return n_grams
return [val[i : i + n] for i in range(len(val) - n + 1)]


def normalize_non_ascii(val: str) -> str:
Expand Down

0 comments on commit 8fda37e

Please sign in to comment.