Skip to content

Commit

Permalink
Add option to enable current stats updates. Updated code to allow
Browse files Browse the repository at this point in the history
multiple init values by specifying init values in raw data separated by
|||.
  • Loading branch information
richardwu committed Nov 24, 2018
1 parent d0fc0da commit 39088f4
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 146 deletions.
144 changes: 81 additions & 63 deletions dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import time
from enum import Enum
import logging
import os
import time
import pandas as pd
import time

from .dbengine import DBengine
from .table import Table, Source
Expand Down Expand Up @@ -117,7 +116,6 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None):
df.fillna('_nan_', inplace=True)

# Call to store to database

self.raw_data.store_to_db(self.engine.engine)
status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath))

Expand Down Expand Up @@ -242,14 +240,14 @@ def get_statistics(self):
<count>: frequency (# of entities) where attr1: val1 AND attr2: val2
"""
if not self.stats_ready:
self.collect_stats()
self.collect_init_stats()
stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats)
self.stats_ready = True
return stats

def collect_stats(self):
def collect_init_stats(self):
"""
collect_stats calculates and memoizes: (based on current statistics)
collect_init_stats calculates and memoizes: (based on RAW/INITIAL data)
1. self.single_attr_stats ({ attribute -> { value -> count } })
the frequency (# of entities) of a given attribute-value
2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } })
Expand All @@ -263,77 +261,102 @@ def collect_stats(self):
self.total_tuples = self.get_raw_data()['_tid_'].nunique()
# Single attribute-value frequency
for attr in self.get_attributes():
self.single_attr_stats[attr] = self._get_stats_single(attr)
self.single_attr_stats[attr] = self._get_init_stats_single(attr)
# Co-occurence frequency
for cond_attr in self.get_attributes():
self.pair_attr_stats[cond_attr] = {}
for trg_attr in self.get_attributes():
if trg_attr != cond_attr:
self.pair_attr_stats[cond_attr][trg_attr] = self._get_stats_pair(cond_attr,trg_attr)
for first_attr in self.get_attributes():
self.pair_attr_stats[first_attr] = {}
for second_attr in self.get_attributes():
if second_attr != first_attr:
self.pair_attr_stats[first_attr][second_attr] = self._get_init_stats_pair(first_attr,second_attr)

def _get_stats_single(self, attr):
def collect_current_stats(self, attr):
"""
Returns a dictionary where the keys possible values for :param attr: and
the values contain the frequency count of that value for this attribute.
"""
<<<<<<< HEAD
# need to decode values into unicode strings since we do lookups via
# unicode strings from Postgres
return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict()
=======
collect_current_stats calculates and memoizes frequency and co-occurence
statistics based on the CURRENT values/data.
# If cell_domain has not been initialized yet, retrieve statistics
# from raw data (this happens when the domain is just being setup)
if not self.aux_table_exists(AuxTables.cell_domain):
return self.get_raw_data()[[attr]].groupby([attr]).size()
See collect_init_stats for which member variables are memoized/overwritten.
Does NOT overwrite self.total_tuples.
"""
# Single attribute-value frequency
for attr in self.get_attributes():
self.single_attr_stats[attr] = self._get_current_stats_single(attr)
# Co-occurence frequency
for first_attr in self.get_attributes():
self.pair_attr_stats[first_attr] = {}
for second_attr in self.get_attributes():
if second_attr != first_attr:
self.pair_attr_stats[first_attr][second_attr] = self._get_current_stats_pair(first_attr,second_attr)

def _get_init_stats_single(self, attr):
"""
_get_init_stats_single returns a dictionary where the keys possible
values for :param attr: and the values contain the frequency count in
the RAW/INITIAL data of that value for this attribute.
"""
# We need to iterate through this in a for loop instead of groupby & size
# since our values may be '|||' separated
freq_count = {}
for (vals,) in self.get_raw_data()[[attr]].itertuples(index=False):
for val in vals.split('|||'):
freq_count[val] = freq_count.get(val, 0) + 1
return freq_count

def _get_current_stats_single(self, attr):
"""
_get_current_stats_single a dictionary where the keys possible values
for :param attr: and the values contain the frequency count in the
CURRENT data of that value for this attribute.
"""
# Retrieve statistics on current value from cell_domain

df_domain = self.get_aux_table(AuxTables.cell_domain).df
df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts()
# We do not store attributes with only NULL values in cell_domain:
# we require _nan_ in our single stats however
if df_count.empty:
return pd.Series(self.total_tuples, index=['_nan_'])
return df_count
>>>>>>> Re-compute single and co-occur stats after every EM iteration.
return {'_nan_': self.total_tuples}
return df_count.to_dict()

<<<<<<< HEAD
def get_stats_pair(self, first_attr, second_attr):
=======
def _get_stats_pair(self, cond_attr, trg_attr):
>>>>>>> Cleaned up some private functions and accesses to aux_tables.
def _get_init_stats_pair(self, first_attr, second_attr):
"""
Returns a dictionary {first_val -> {second_val -> count } } where:
_get_init_stats_pair returns a dictionary {first_val -> {second_val ->
count } } where (based on RAW/INITIAL dataset):
<first_val>: all possible values for first_attr
<second_val>: all values for second_attr that appeared at least once with <first_val>
<count>: frequency (# of entities) where first_attr: <first_val> AND second_attr: <second_val>
"""
<<<<<<< HEAD
tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count")
return _dictify(tmp_df)
=======
# If cell_domain has not been initialized yet, retrieve statistics
# from raw data (this happens when the domain is just being setup)
if not self.aux_table_exists(AuxTables.cell_domain):
return self.get_raw_data()[[cond_attr,trg_attr]].groupby([cond_attr,trg_attr]).size().reset_index(name="count")

# We need to iterate through this in a for loop instead of groupby & size
# since our values may be '|||' separated
cooccur_freq_count = {}
for vals1, vals2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False):
for val1 in vals1.split('|||'):
cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {})
for val2 in vals2.split('|||'):
cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1
return cooccur_freq_count

def _get_current_stats_pair(self, first_attr, second_attr):
"""
_get_current_stats_pair returns a dictionary {first_val -> {second_val ->
count } } where (based on CURRENT dataset):
<first_val>: all possible values for first_attr
<second_val>: all values for second_attr that appeared at least once with <first_val>
<count>: frequency (# of entities) where first_attr: <first_val> AND second_attr: <second_val>
"""
# Retrieve pairwise statistics on current value from cell_domain

df_domain = self.get_aux_table(AuxTables.cell_domain).df
# Filter cell_domain for only the attributes we care about
df_domain = df_domain[df_domain['attribute'].isin([cond_attr, trg_attr])]
# Convert to wide form so we have our :param cond_attr:
# and :trg_attr: as columns along with the _tid_ column
df_domain = df_domain[df_domain['attribute'].isin([first_attr, second_attr])]
# Convert to wide form so we have our :param first_attr:
# and :second_attr: as columns along with the _tid_ column
df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value')
# We do not store cells for attributes consisting of only NULL values in cell_domain.
# We require this for pair stats though.
if cond_attr not in df_domain.columns:
df_domain[cond_attr] = '_nan_'
if trg_attr not in df_domain.columns:
df_domain[trg_attr] = '_nan_'
return df_domain.groupby([cond_attr, trg_attr]).size().reset_index(name="count")
>>>>>>> Re-compute single and co-occur stats after every EM iteration.
if first_attr not in df_domain.columns:
df_domain[first_attr] = '_nan_'
if second_attr not in df_domain.columns:
df_domain[second_attr] = '_nan_'
return _dictify(df_domain.groupby([first_attr, second_attr]).size().reset_index(name="count"))

def get_domain_info(self):
"""
Expand Down Expand Up @@ -372,18 +395,13 @@ def generate_inferred_values(self):

def generate_repaired_dataset(self):
tic = time.clock()
init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
<<<<<<< HEAD
t = self.aux_table[AuxTables.inf_values_dom]
repaired_vals = _dictify(t.df.reset_index())
=======
records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
t = self.aux_tables[AuxTables.inf_values_dom]
repaired_vals = dictify(t.df.reset_index())
>>>>>>> Cleaned up some private functions and accesses to aux_tables.
repaired_vals = _dictify(t.df.reset_index())
for tid in repaired_vals:
for attr in repaired_vals[tid]:
init_records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(init_records)
records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(records)
name = self.raw_data.name+'_repaired'
self.repaired_data = Table(name, Source.DF, df=repaired_df)
self.repaired_data.store_to_db(self.engine.engine)
Expand Down
2 changes: 1 addition & 1 deletion dataset/table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
import os
import pandas as pd
from enum import Enum

class Source(Enum):
FILE = 1
Expand Down
3 changes: 2 additions & 1 deletion detect/nulldetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def detect_noisy_cells(self):
attributes = self.ds.get_attributes()
errors = []
for attr in attributes:
tmp_df = self.df[self.df[attr].isnull()]['_tid_'].to_frame()
# self.df i.e. raw_data has all NULL values converted to '_nan_'
tmp_df = self.df[self.df[attr] == '_nan_']['_tid_'].to_frame()
tmp_df.insert(1, "attribute", attr)
errors.append(tmp_df)
errors_df = pd.concat(errors, ignore_index=True)
Expand Down
34 changes: 24 additions & 10 deletions detect/violationdetector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@ def setup(self, dataset, env):
self.constraints = dataset.constraints

def detect_noisy_cells(self):
# Convert Constraints to SQL queries
"""
detect_noisy_cells returns all cells that are involved in a DC violation.
:return: pandas.DataFrame with two columns:
'_tid_': TID of tuple
'attribute': attribute corresponding to cell involved in DC violation
"""

# Convert Constraints to SQL queries

tbl = self.ds.raw_data.name
queries = []
# attributes involved in a DC violation (indexed by corresponding query)
attrs = []
for c_key in self.constraints:
c = self.constraints[c_key]
q = self.to_sql(tbl, c)
for constraint in self.constraints.values():
# SQL query to query for TIDs involved in a DC violation for this constraint
q = self.to_sql(tbl, constraint)
queries.append(q)
attrs.append(c.components)
attrs.append(constraint.components)
# Execute Queries over the DBEngine of Dataset
results = self.ds.engine.execute_queries(queries)

# Generate final output
errors = []
for i in range(len(attrs)):
res = results[i]
attr_list = attrs[i]
tmp_df = self.gen_tid_attr_output(res, attr_list)
for attr_list, res in zip(attrs, results):
# DataFrame with TID and attribute pairs from DC violation queries
tmp_df = self._gen_tid_attr_output(res, attr_list)
errors.append(tmp_df)
errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True)
return errors_df
Expand Down Expand Up @@ -79,7 +88,12 @@ def gen_mult_query(self, tbl, c):
query = mult_template.substitute(table=tbl, cond1=cond1, c='', cond2=cond2)
return query

def gen_tid_attr_output(self, res, attr_list):
def _gen_tid_attr_output(self, res, attr_list):
"""
_gen_tid_attr_output creates a DataFrame containing the TIDs from
the DC violation query results in :param res: with the attributes
that were involved in the violation in :param attr_list:.
"""
errors = []
for tuple in res:
tid = int(tuple[0])
Expand Down
Loading

0 comments on commit 39088f4

Please sign in to comment.