diff --git a/dataset/dataset.py b/dataset/dataset.py index 0d4001165..2c67cc6fb 100644 --- a/dataset/dataset.py +++ b/dataset/dataset.py @@ -1,9 +1,8 @@ -import logging -import time from enum import Enum +import logging import os -import time import pandas as pd +import time from .dbengine import DBengine from .table import Table, Source @@ -117,7 +116,6 @@ def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None): df.fillna('_nan_', inplace=True) # Call to store to database - self.raw_data.store_to_db(self.engine.engine) status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath)) @@ -242,14 +240,14 @@ def get_statistics(self): : frequency (# of entities) where attr1: val1 AND attr2: val2 """ if not self.stats_ready: - self.collect_stats() + self.collect_init_stats() stats = (self.total_tuples, self.single_attr_stats, self.pair_attr_stats) self.stats_ready = True return stats - def collect_stats(self): + def collect_init_stats(self): """ - collect_stats calculates and memoizes: (based on current statistics) + collect_init_stats calculates and memoizes: (based on RAW/INITIAL data) 1. self.single_attr_stats ({ attribute -> { value -> count } }) the frequency (# of entities) of a given attribute-value 2. self.pair_attr_stats ({ attr1 -> { attr2 -> {val1 -> {val2 -> count } } } }) @@ -263,77 +261,102 @@ def collect_stats(self): self.total_tuples = self.get_raw_data()['_tid_'].nunique() # Single attribute-value frequency for attr in self.get_attributes(): - self.single_attr_stats[attr] = self._get_stats_single(attr) + self.single_attr_stats[attr] = self._get_init_stats_single(attr) # Co-occurence frequency - for cond_attr in self.get_attributes(): - self.pair_attr_stats[cond_attr] = {} - for trg_attr in self.get_attributes(): - if trg_attr != cond_attr: - self.pair_attr_stats[cond_attr][trg_attr] = self._get_stats_pair(cond_attr,trg_attr) + for first_attr in self.get_attributes(): + self.pair_attr_stats[first_attr] = {} + for second_attr in self.get_attributes(): + if second_attr != first_attr: + self.pair_attr_stats[first_attr][second_attr] = self._get_init_stats_pair(first_attr,second_attr) - def _get_stats_single(self, attr): + def collect_current_stats(self, attr): """ - Returns a dictionary where the keys possible values for :param attr: and - the values contain the frequency count of that value for this attribute. - """ -<<<<<<< HEAD - # need to decode values into unicode strings since we do lookups via - # unicode strings from Postgres - return self.get_raw_data()[[attr]].groupby([attr]).size().to_dict() -======= + collect_current_stats calculates and memoizes frequency and co-occurence + statistics based on the CURRENT values/data. - # If cell_domain has not been initialized yet, retrieve statistics - # from raw data (this happens when the domain is just being setup) - if not self.aux_table_exists(AuxTables.cell_domain): - return self.get_raw_data()[[attr]].groupby([attr]).size() + See collect_init_stats for which member variables are memoized/overwritten. + Does NOT overwrite self.total_tuples. + """ + # Single attribute-value frequency + for attr in self.get_attributes(): + self.single_attr_stats[attr] = self._get_current_stats_single(attr) + # Co-occurence frequency + for first_attr in self.get_attributes(): + self.pair_attr_stats[first_attr] = {} + for second_attr in self.get_attributes(): + if second_attr != first_attr: + self.pair_attr_stats[first_attr][second_attr] = self._get_current_stats_pair(first_attr,second_attr) + def _get_init_stats_single(self, attr): + """ + _get_init_stats_single returns a dictionary where the keys possible + values for :param attr: and the values contain the frequency count in + the RAW/INITIAL data of that value for this attribute. + """ + # We need to iterate through this in a for loop instead of groupby & size + # since our values may be '|||' separated + freq_count = {} + for (vals,) in self.get_raw_data()[[attr]].itertuples(index=False): + for val in vals.split('|||'): + freq_count[val] = freq_count.get(val, 0) + 1 + return freq_count + + def _get_current_stats_single(self, attr): + """ + _get_current_stats_single a dictionary where the keys possible values + for :param attr: and the values contain the frequency count in the + CURRENT data of that value for this attribute. + """ # Retrieve statistics on current value from cell_domain - df_domain = self.get_aux_table(AuxTables.cell_domain).df df_count = df_domain.loc[df_domain['attribute'] == attr, 'current_value'].value_counts() # We do not store attributes with only NULL values in cell_domain: # we require _nan_ in our single stats however if df_count.empty: - return pd.Series(self.total_tuples, index=['_nan_']) - return df_count ->>>>>>> Re-compute single and co-occur stats after every EM iteration. + return {'_nan_': self.total_tuples} + return df_count.to_dict() -<<<<<<< HEAD - def get_stats_pair(self, first_attr, second_attr): -======= - def _get_stats_pair(self, cond_attr, trg_attr): ->>>>>>> Cleaned up some private functions and accesses to aux_tables. + def _get_init_stats_pair(self, first_attr, second_attr): """ - Returns a dictionary {first_val -> {second_val -> count } } where: + _get_init_stats_pair returns a dictionary {first_val -> {second_val -> + count } } where (based on RAW/INITIAL dataset): : all possible values for first_attr : all values for second_attr that appeared at least once with : frequency (# of entities) where first_attr: AND second_attr: """ -<<<<<<< HEAD - tmp_df = self.get_raw_data()[[first_attr,second_attr]].groupby([first_attr,second_attr]).size().reset_index(name="count") - return _dictify(tmp_df) -======= - # If cell_domain has not been initialized yet, retrieve statistics - # from raw data (this happens when the domain is just being setup) - if not self.aux_table_exists(AuxTables.cell_domain): - return self.get_raw_data()[[cond_attr,trg_attr]].groupby([cond_attr,trg_attr]).size().reset_index(name="count") + # We need to iterate through this in a for loop instead of groupby & size + # since our values may be '|||' separated + cooccur_freq_count = {} + for vals1, vals2 in self.get_raw_data()[[first_attr,second_attr]].itertuples(index=False): + for val1 in vals1.split('|||'): + cooccur_freq_count[val1] = cooccur_freq_count.get(val1, {}) + for val2 in vals2.split('|||'): + cooccur_freq_count[val1][val2] = cooccur_freq_count[val1].get(val2, 0) + 1 + return cooccur_freq_count + + def _get_current_stats_pair(self, first_attr, second_attr): + """ + _get_current_stats_pair returns a dictionary {first_val -> {second_val -> + count } } where (based on CURRENT dataset): + : all possible values for first_attr + : all values for second_attr that appeared at least once with + : frequency (# of entities) where first_attr: AND second_attr: + """ # Retrieve pairwise statistics on current value from cell_domain - df_domain = self.get_aux_table(AuxTables.cell_domain).df # Filter cell_domain for only the attributes we care about - df_domain = df_domain[df_domain['attribute'].isin([cond_attr, trg_attr])] - # Convert to wide form so we have our :param cond_attr: - # and :trg_attr: as columns along with the _tid_ column + df_domain = df_domain[df_domain['attribute'].isin([first_attr, second_attr])] + # Convert to wide form so we have our :param first_attr: + # and :second_attr: as columns along with the _tid_ column df_domain = df_domain[['_tid_', 'attribute', 'current_value']].pivot(index='_tid_', columns='attribute', values='current_value') # We do not store cells for attributes consisting of only NULL values in cell_domain. # We require this for pair stats though. - if cond_attr not in df_domain.columns: - df_domain[cond_attr] = '_nan_' - if trg_attr not in df_domain.columns: - df_domain[trg_attr] = '_nan_' - return df_domain.groupby([cond_attr, trg_attr]).size().reset_index(name="count") ->>>>>>> Re-compute single and co-occur stats after every EM iteration. + if first_attr not in df_domain.columns: + df_domain[first_attr] = '_nan_' + if second_attr not in df_domain.columns: + df_domain[second_attr] = '_nan_' + return _dictify(df_domain.groupby([first_attr, second_attr]).size().reset_index(name="count")) def get_domain_info(self): """ @@ -372,18 +395,13 @@ def generate_inferred_values(self): def generate_repaired_dataset(self): tic = time.clock() - init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) -<<<<<<< HEAD - t = self.aux_table[AuxTables.inf_values_dom] - repaired_vals = _dictify(t.df.reset_index()) -======= + records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False) t = self.aux_tables[AuxTables.inf_values_dom] - repaired_vals = dictify(t.df.reset_index()) ->>>>>>> Cleaned up some private functions and accesses to aux_tables. + repaired_vals = _dictify(t.df.reset_index()) for tid in repaired_vals: for attr in repaired_vals[tid]: - init_records[tid][attr] = repaired_vals[tid][attr] - repaired_df = pd.DataFrame.from_records(init_records) + records[tid][attr] = repaired_vals[tid][attr] + repaired_df = pd.DataFrame.from_records(records) name = self.raw_data.name+'_repaired' self.repaired_data = Table(name, Source.DF, df=repaired_df) self.repaired_data.store_to_db(self.engine.engine) diff --git a/dataset/table.py b/dataset/table.py index 0a1a683e2..99ffab808 100644 --- a/dataset/table.py +++ b/dataset/table.py @@ -1,6 +1,6 @@ +from enum import Enum import os import pandas as pd -from enum import Enum class Source(Enum): FILE = 1 diff --git a/detect/nulldetector.py b/detect/nulldetector.py index 412196766..0d4108f11 100644 --- a/detect/nulldetector.py +++ b/detect/nulldetector.py @@ -22,7 +22,8 @@ def detect_noisy_cells(self): attributes = self.ds.get_attributes() errors = [] for attr in attributes: - tmp_df = self.df[self.df[attr].isnull()]['_tid_'].to_frame() + # self.df i.e. raw_data has all NULL values converted to '_nan_' + tmp_df = self.df[self.df[attr] == '_nan_']['_tid_'].to_frame() tmp_df.insert(1, "attribute", attr) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True) diff --git a/detect/violationdetector.py b/detect/violationdetector.py index 575472869..5d586e8d0 100644 --- a/detect/violationdetector.py +++ b/detect/violationdetector.py @@ -17,24 +17,33 @@ def setup(self, dataset, env): self.constraints = dataset.constraints def detect_noisy_cells(self): - # Convert Constraints to SQL queries + """ + detect_noisy_cells returns all cells that are involved in a DC violation. + + :return: pandas.DataFrame with two columns: + '_tid_': TID of tuple + 'attribute': attribute corresponding to cell involved in DC violation + """ + + # Convert Constraints to SQL queries + tbl = self.ds.raw_data.name queries = [] + # attributes involved in a DC violation (indexed by corresponding query) attrs = [] - for c_key in self.constraints: - c = self.constraints[c_key] - q = self.to_sql(tbl, c) + for constraint in self.constraints.values(): + # SQL query to query for TIDs involved in a DC violation for this constraint + q = self.to_sql(tbl, constraint) queries.append(q) - attrs.append(c.components) + attrs.append(constraint.components) # Execute Queries over the DBEngine of Dataset results = self.ds.engine.execute_queries(queries) # Generate final output errors = [] - for i in range(len(attrs)): - res = results[i] - attr_list = attrs[i] - tmp_df = self.gen_tid_attr_output(res, attr_list) + for attr_list, res in zip(attrs, results): + # DataFrame with TID and attribute pairs from DC violation queries + tmp_df = self._gen_tid_attr_output(res, attr_list) errors.append(tmp_df) errors_df = pd.concat(errors, ignore_index=True).drop_duplicates().reset_index(drop=True) return errors_df @@ -79,7 +88,12 @@ def gen_mult_query(self, tbl, c): query = mult_template.substitute(table=tbl, cond1=cond1, c='', cond2=cond2) return query - def gen_tid_attr_output(self, res, attr_list): + def _gen_tid_attr_output(self, res, attr_list): + """ + _gen_tid_attr_output creates a DataFrame containing the TIDs from + the DC violation query results in :param res: with the attributes + that were involved in the violation in :param attr_list:. + """ errors = [] for tuple in res: tid = int(tuple[0]) diff --git a/domain/domain.py b/domain/domain.py index 3b96371b3..405847b28 100644 --- a/domain/domain.py +++ b/domain/domain.py @@ -31,7 +31,6 @@ def __init__(self, env, dataset, cor_strength = 0.1, sampling_prob=0.3, max_samp self.max_sample = max_sample self.single_stats = {} self.pair_stats = {} - self.all_attrs = {} def setup(self): """ @@ -54,7 +53,9 @@ def find_correlations(self): the pairwise correlations between attributes (values are treated as discrete categories). """ - df = self.ds.get_raw_data()[self.ds.get_attributes()].copy() + # use expanded raw DataFrame to calculate correlations (since + # raw may contain '|||' separated values) + df = self._expand_raw_df()[self.ds.get_attributes()] # convert dataset to categories/factors for attr in df.columns: df[attr] = df[attr].astype('category').cat.codes @@ -64,6 +65,43 @@ def find_correlations(self): m_corr = df.corr() self.correlations = m_corr + def _expand_raw_df(self): + """ + _expand_raw_df returns an expanded version of the raw DataFrame + where every row with cells with multiple values (separated by '|||') + are expanded into multiple rows that is the cross-product of the + multi-valued cells. + + For example if a row contains + + attr1 | attr2 + A|||B|||C D|||E + + this would be expanded into + + attr1 | attr2 + A D + A E + B D + B E + C D + C E + """ + # Cells may contain values separated by '|||': we need to + # expand this into multiple rows + raw_df = self.ds.get_raw_data() + + tic = time.clock() + expanded_rows = [] + for tup in raw_df.itertuples(): + expanded_tup = [val.split('|||') if hasattr(val, 'split') else (val,) for val in tup ] + expanded_rows.extend([new_tup for new_tup in itertools.product(*expanded_tup)]) + toc = time.clock() + logging.debug("Time to expand raw data: %.2f secs", toc-tic) + expanded_df = pd.DataFrame(expanded_rows, columns=raw_df.index.names + list(raw_df.columns)) + expanded_df.set_index(raw_df.index.names, inplace=True) + return expanded_df + def store_domains(self, domain): """ store_domains stores the :param domain: DataFrame as the 'cell_domain' @@ -75,7 +113,7 @@ def store_domains(self, domain): _cid_: cell ID _vid_: random variable ID (1-1 with _cid_) attribute: name of attribute - rv_val: cell value + rv_val: domain value val_id: domain index of rv_val """ if domain.empty: @@ -204,9 +242,8 @@ def generate_domain(self): # Iterate over dataset rows cells = [] vid = 0 - records = self.ds.get_raw_data().to_records() - self.all_attrs = list(records.dtype.names) - for row in tqdm(list(records)): + raw_records = self.ds.get_raw_data().to_records() + for row in tqdm(raw_records): tid = row['_tid_'] app = [] @@ -260,6 +297,9 @@ def get_domain_cell(self, attr, row): This would produce [B,C,E] as domain values. + :param attr: (str) name of attribute to generate domain info for + :param row: (pandas.record) Pandas record (tuple) of the current TID's row + :return: (list of initial values, current value, list of domain values). """ @@ -269,17 +309,17 @@ def get_domain_cell(self, attr, row): # and take the top K co-occurrence values for 'attr' with the current # row's 'cond_attr' value. for cond_attr in correlated_attributes: - if cond_attr == attr or cond_attr == 'index' or cond_attr == '_tid_': + if cond_attr == attr: continue - cond_val = row[cond_attr] - if not pd.isnull(cond_val): - if not self.pair_stats[cond_attr][attr]: - break + # row[cond_attr] should always be a string (since it comes from self.raw_data) + for cond_val in row[cond_attr].split('|||'): s = self.pair_stats[cond_attr][attr] try: candidates = s[cond_val] domain.update(candidates) except KeyError as missing_val: + # KeyError is possible since we do not store stats for + # attributes with only NULL values if not pd.isnull(row[attr]): # error since co-occurrence must be at least 1 (since # the current row counts as one co-occurrence). @@ -289,19 +329,52 @@ def get_domain_cell(self, attr, row): # Remove _nan_ if added due to correlated attributes domain.discard('_nan_') - # Add initial value in domain - init_values = ['_nan_'] - if not pd.isnull(row[attr]): - # Assume value in raw dataset is given as ||| separate initial values - init_values = row[attr].split('|||') - domain.update(set(init_values)) - - # Take the first initial value as the current value - # TODO(richardwu): revisit how we should initialize 'current' - current_value = init_values[0] + init_values, current_value = self._init_and_current(attr, row) + domain.update(init_values) return init_values, current_value, list(domain) + def _init_and_current(self, attr, init_row): + """ + _init_and_current returns the initial values for :param attr: + and the current value: the initial value that has the highest + cumulative co-occurrence probability with the other initial values in + this row. + """ + # Assume value in raw dataset is given as ||| separate initial values + init_values = init_row[attr].split('|||') + + # Only one initial value: current is the initial value + if len(init_values) == 1: + return init_values, init_values[0] + + _, single_stats, pair_stats = self.ds.get_statistics() + attrs = self.ds.get_attributes() + + # Determine current value by computing co-occurrence probability + best_val = None + best_score = None + for init_val in init_values: + # Compute total sum of co-occur probabilities with all other + # initial values in this row, that is we calculate the sum of + # + # P(initial | other_init_val) = P(initial, other_init_val) / P(other_init_val) + # + # We subtract one for pair_stats since we do not want to include + # the co-occurrence of our current row (consider the extreme case + # where an errorneous initial value only occurs once: its co-occurrence + # probability will always be 1 but it does not tell us this value + # co-occurs most frequently with our other initial values). + cur_score = sum(float(pair_stats[attr][other_attr][init_val][other_val] - 1) / single_stats[attr][init_val] + for other_attr in attrs + if attr != other_attr + for other_val in init_row[other_attr].split('|||')) + # Keep the best initial value only + if best_score is None or cur_score > best_score: + best_val = init_val + best_score = cur_score + return init_values, best_val + def get_random_domain(self, attr, init_values): """ get_random_domain returns a random sample of at most size diff --git a/evaluate/eval.py b/evaluate/eval.py index c142c0e68..d59dd7c8c 100644 --- a/evaluate/eval.py +++ b/evaluate/eval.py @@ -119,8 +119,9 @@ def compute_total_repairs(self): the number of cells where the initial value differs from the inferred value (i.e. the number of repairs) for the entities in the TRAINING data. """ - # TODO(richardwu): how do we define a "repair" if we have multiple - # init values? + # This query works with init_values with multiple values ('|||' separated) + # since it is still considered a 'repair' if we convert multiple + # initial values into one value. query = """ SELECT count(*) @@ -144,8 +145,9 @@ def compute_total_repairs_clean(self): the number of cells where the initial value differs from the inferred value (i.e. the number of repairs) for the entities in the TEST (clean) data. """ - # TODO(richardwu): how do we define a "repair" if we have multiple - # init values? + # This query works with init_values with multiple values ('|||' separated) + # since it is still considered a 'repair' if we convert multiple + # initial values into one value. query = """ SELECT count(*) diff --git a/examples/holoclean_repair_example.py b/examples/holoclean_repair_example.py index 52df2bea4..b59ecdc6f 100644 --- a/examples/holoclean_repair_example.py +++ b/examples/holoclean_repair_example.py @@ -14,7 +14,7 @@ pruning_topk=0.1, epochs=30, weight_decay=0.01, - threads=20, + threads=4, batch_size=1, verbose=True, timeout=3*60000, @@ -40,8 +40,8 @@ LangModelFeat(), ConstraintFeat() ] -hc.repair_errors(featurizers) - # 5. Evaluate the correctness of the results. -hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') +em_iter_func = lambda: hc.evaluate('../testdata/hospital_clean.csv', 'tid', 'attribute', 'correct_val') +hc.repair_errors(featurizers, em_iterations=1, em_iter_func=em_iter_func) + diff --git a/examples/start_example.sh b/examples/start_example.sh index 5ee266ef4..c0cb2508e 100755 --- a/examples/start_example.sh +++ b/examples/start_example.sh @@ -3,6 +3,6 @@ source ../set_env.sh echo "Launching example" -python holoclean_repair_example.py +python holoclean_food.py diff --git a/holoclean.py b/holoclean.py index 66f63a49d..aadc355a9 100644 --- a/holoclean.py +++ b/holoclean.py @@ -112,7 +112,12 @@ {'default': False, 'dest': 'print_fw', 'action': 'store_true', - 'help': 'print the weights of featurizers'}) + 'help': 'print the weights of featurizers'}), + (tuple(['--currentstats']), + {'default': False, + 'dest': 'current_stats', + 'action': 'store_true', + 'help': 're-compute frequency and co-occur stats after every EM iteration'}), ] @@ -286,8 +291,9 @@ def repair_errors(self, featurizers, em_iterations=1, em_iter_func=None): logging.debug('Time to retrieve featurizer weights: %.2f secs' % time) # Update current values with inferred values self.ds.update_current_values() - # Re-compute statistics with new current values - self.ds.collect_stats() + if self.env['current_stats']: + # Re-compute statistics with new current values + self.ds.collect_current_stats() # Call em_iter_func if provided at the end of every EM iteration if em_iter_func is not None: diff --git a/repair/featurize/constraintfeat.py b/repair/featurize/constraintfeat.py index c44ac8cf3..053e539d5 100644 --- a/repair/featurize/constraintfeat.py +++ b/repair/featurize/constraintfeat.py @@ -70,11 +70,11 @@ def execute_queries(self,queries): def relax_unary_predicate(self, predicate): """ - relax_binary_predicate returns the attribute, operation, and + relax_unary_predicate returns the attribute, operation, and tuple attribute reference. :return: (attr, op, const), for example: - ("StateAvg", "<>", 't1."StateAvg"') + ("StateAvg", "<>", "StateAvg"') """ attr = predicate.components[0][1] op = predicate.operation diff --git a/repair/featurize/featurize.py b/repair/featurize/featurize.py index 98eedb168..69876b992 100644 --- a/repair/featurize/featurize.py +++ b/repair/featurize/featurize.py @@ -14,7 +14,7 @@ def __init__(self, dataset, env, featurizers): self.total_vars, self.classes = self.ds.get_domain_info() self.processes = self.env['threads'] for f in featurizers: - f.setup_featurizer(self.ds, self.total_vars, self.classes, self.processes) + f.setup_featurizer(self.env, self.ds, self.total_vars, self.classes, self.processes) tensors = [f.create_tensor() for f in featurizers] self.featurizer_info = [FeatInfo(featurizers[i].name, t.size()[2], featurizers[i].learnable, featurizers[i].init_weight) for i, t in enumerate(tensors)] tensor = torch.cat(tensors,2) diff --git a/repair/featurize/featurizer.py b/repair/featurize/featurizer.py index 70068dc83..9538cd985 100644 --- a/repair/featurize/featurizer.py +++ b/repair/featurize/featurizer.py @@ -11,7 +11,8 @@ def __init__(self, learnable=True, init_weight=1.0): self.learnable = learnable self.init_weight = init_weight - def setup_featurizer(self, dataset, total_vars, classes, processes=20): + def setup_featurizer(self, env, dataset, total_vars, classes, processes=20): + self.env = env self.ds = dataset self.total_vars = total_vars self.classes = classes diff --git a/repair/featurize/freqfeat.py b/repair/featurize/freqfeat.py index d1a9b6b8c..b29b0a221 100644 --- a/repair/featurize/freqfeat.py +++ b/repair/featurize/freqfeat.py @@ -19,7 +19,15 @@ def gen_feat_tensor(self, input, classes): attr_idx = self.ds.attr_to_idx[attribute] tensor = torch.zeros(1, classes, self.attrs_number) for idx, val in enumerate(domain): - prob = float(self.single_stats[attribute][val])/float(self.total) + freq = 0.0 + if self.env['current_stats']: + # In the case where we update statistics to current values after + # every EM iteration, the domain value may no longer appear amongst + # current values. + freq = self.single_stats[attribute].get(val, 0) + else: + freq = self.single_stats[attribute][val] + prob = float(freq)/float(self.total) tensor[0][idx][attr_idx] = prob return tensor diff --git a/repair/featurize/occurattrfeat.py b/repair/featurize/occurattrfeat.py index 47042e1a1..8702029a2 100644 --- a/repair/featurize/occurattrfeat.py +++ b/repair/featurize/occurattrfeat.py @@ -32,8 +32,8 @@ def create_tensor(self): # Set tuple_id index on raw_data t = self.ds.get_aux_table(AuxTables.cell_domain) sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] - records = sorted_domain.to_records() - for row in tqdm(list(records)): + tuples = sorted_domain.itertuples() + for row in tqdm(list(tuples)): #Get tuple from raw_dataset tid = row['_tid_'] tuple = self.raw_data_dict[tid] diff --git a/repair/featurize/occurfeat.py b/repair/featurize/occurfeat.py index d68de2f87..a195fbd6a 100644 --- a/repair/featurize/occurfeat.py +++ b/repair/featurize/occurfeat.py @@ -31,7 +31,7 @@ def setup_stats(self): # current_values_dict is a Dictionary mapping TID -> { attribute -> current value } self.current_values_dict = {} - for (tid, attr, cur_val) in self.ds.get_aux_table(AuxTables.cell_domain).df[['_tid_', 'attribute', 'current_value']].to_records(index=False): + for (tid, attr, cur_val) in self.ds.get_aux_table(AuxTables.cell_domain).df[['_tid_', 'attribute', 'current_value']].itertuples(index=False): self.current_values_dict[tid] = self.current_values_dict.get(tid, {}) self.current_values_dict[tid][attr] = cur_val @@ -56,10 +56,9 @@ def create_tensor(self): tensors = [] # Set tuple_id index on raw_data t = self.ds.get_aux_table(AuxTables.cell_domain) - sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','_vid_','domain']] + sorted_domain = t.df.reset_index().sort_values(by=['_vid_'])[['_tid_','attribute','domain']] records = sorted_domain.to_records() - for row in tqdm(list(records)): - #Get current values for this TID + for row in tqdm(records): tid = row['_tid_'] current_tuple = self.current_values_dict[tid] feat_tensor = self.gen_feat_tensor(row, current_tuple) @@ -80,7 +79,7 @@ def gen_feat_tensor(self, row, current_tuple): """ # tensor is a (1 X domain size X # of attributes) pytorch.Tensor # tensor[0][domain_idx][rv_idx] contains the co-occurrence probability - # between the current attribute (row['attribute']) and the domain values + # between the current attribute (row.attribute) and the domain values # a possible domain value for this entity tensor = torch.zeros(1, self.classes, self.attrs_number) rv_attr = row['attribute'] @@ -90,33 +89,36 @@ def gen_feat_tensor(self, row, current_tuple): # Iterate through every attribute (and current value for that # attribute) and set the co-occurrence probability for every - # domain value for our current row['attribute']. - for attr in self.all_attrs: - # Skip pairwise with current attribute or NULL value - # 'attr' may not be in 'current_tuple' since we do not store - # attributes with all NULL values in our current values - if attr == rv_attr or pd.isnull(current_tuple.get(attr, None)): + # domain value for our current row.attribute. + for cur_attr in self.all_attrs: + # Skip pairwise with current attribute or NULL value. + # 'attr' may not be in 'current_tuple' since we do not + # store attributes with all NULL values in cell_domain. + if cur_attr == rv_attr or pd.isnull(current_tuple.get(cur_attr, None)): + continue + + cur_val = current_tuple[cur_attr] + if cur_val not in self.pair_stats[cur_attr][rv_attr]: + # cur_val may not be in pairwise stats if cur_attr contains + # only NULL values + if not pd.isnull(current_tuple[cur_attr]): + # Actual error if not null + raise Exception('Something is wrong with the pairwise statistics. <{cur_val}> should be present in dictionary.'.format(cur_val=cur_val)) continue - attr_idx = self.ds.attr_to_idx[attr] - val = current_tuple[attr] - attr_freq = float(self.single_stats[attr][val]) # Get topK values - if val not in self.pair_stats[attr][rv_attr]: - if not pd.isnull(current_tuple[rv_attr]): - raise Exception('Something is wrong with the pairwise statistics. <{val}> should be present in dictionary.'.format(val)) + all_vals = self.pair_stats[cur_attr][rv_attr][cur_val] + if len(all_vals) <= len(rv_domain_idx): + candidates = list(all_vals.keys()) else: - # dict of { val -> co-occur count } - all_vals = self.pair_stats[attr][rv_attr][val] - if len(all_vals) <= len(rv_domain_idx): - candidates = list(all_vals.keys()) - else: - candidates = domain - - # iterate through all possible domain values of row['attribute'] - for rv_val in candidates: - cooccur_freq = float(all_vals.get(rv_val,0.0)) - prob = cooccur_freq/attr_freq - if rv_val in rv_domain_idx: - tensor[0][rv_domain_idx[rv_val]][attr_idx] = prob + candidates = domain + + # iterate through all possible domain values of row.attribute + for rv_val in candidates: + cooccur_freq = float(all_vals.get(rv_val,0.0)) + cur_attr_freq = float(self.single_stats[cur_attr][cur_val]) + prob = cooccur_freq/cur_attr_freq + if rv_val in rv_domain_idx: + cur_attr_idx = self.ds.attr_to_idx[cur_attr] + tensor[0][rv_domain_idx[rv_val]][cur_attr_idx] = prob return tensor diff --git a/tests/test_holoclean_repair.py b/tests/test_holoclean_repair.py index 654c6b5a0..6163f6e7d 100644 --- a/tests/test_holoclean_repair.py +++ b/tests/test_holoclean_repair.py @@ -14,7 +14,8 @@ class TestHolocleanRepair(unittest.TestCase): def test_hospital(self): # 1. Setup a HoloClean session. - hc = holoclean.HoloClean(pruning_topk=0.1, epochs=10, weight_decay=0.01, threads=20, batch_size=1, verbose=True, timeout=3*60000).session + hc = holoclean.HoloClean(pruning_topk=0.1, epochs=10, weight_decay=0.01, + threads=4, batch_size=1, verbose=True, timeout=3*60000).session # 2. Load training data and denial constraints. hc.load_data('hospital', '../testdata/hospital.csv')