Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability: incorporate early pruning optimizations #368

Merged
merged 7 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions lux/_config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,26 @@ def __init__(self):
# flags whether or not an action has been registered or removed and should be re-rendered by frame.py
self.update_actions: Dict[str, bool] = {}
self.update_actions["flag"] = False
self._sampling_start = 10000
self._sampling_cap = 30000
self._sampling_flag = True
self._heatmap_flag = True
self._plotting_backend = "vegalite"
self._topk = 15
self._sort = "descending"
self._pandas_fallback = True
self._interestingness_fallback = True
self.heatmap_bin_size = 40
#####################################
#### Optimization Configurations ####
#####################################
self._sampling_start = 100000
self._sampling_cap = 1000000
self._sampling_flag = True
self._heatmap_flag = True
self.lazy_maintain = True
self.early_pruning = True
self.early_pruning_sample_cap = 30000
# Apply sampling only if the dataset is 150% larger than the sample cap
self.early_pruning_sample_start = self.early_pruning_sample_cap * 1.5
self.streaming = False
self.render_widget = True

@property
def topk(self):
Expand Down
81 changes: 50 additions & 31 deletions lux/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, *args, **kw):
lux.config.executor = SQLExecutor()

self._sampled = None
self._approx_sample = None
self._toggle_pandas_display = True
self._message = Message()
self._pandas_only = False
Expand Down Expand Up @@ -115,47 +116,56 @@ def data_type(self):
self.maintain_metadata()
return self._data_type

def maintain_metadata(self):
def compute_metadata(self) -> None:
"""
Compute dataset metadata and statistics
"""
if len(self) > 0:
if lux.config.executor.name != "SQLExecutor":
lux.config.executor.compute_stats(self)
lux.config.executor.compute_dataset_metadata(self)
self._infer_structure()
self._metadata_fresh = True

def maintain_metadata(self) -> None:
"""
Maintain dataset metadata and statistics (Compute only if needed)
"""
is_sql_tbl = lux.config.executor.name == "SQLExecutor"
if lux.config.SQLconnection != "" and is_sql_tbl:
from lux.executor.SQLExecutor import SQLExecutor

lux.config.executor = SQLExecutor()
if lux.config.lazy_maintain:
# Check that metadata has not yet been computed
if not hasattr(self, "_metadata_fresh") or not self._metadata_fresh:
# only compute metadata information if the dataframe is non-empty
self.compute_metadata()
else:
self.compute_metadata()

# Check that metadata has not yet been computed
if not hasattr(self, "_metadata_fresh") or not self._metadata_fresh:
# only compute metadata information if the dataframe is non-empty
if is_sql_tbl:
lux.config.executor.compute_dataset_metadata(self)
self._infer_structure()
self._metadata_fresh = True
else:
if len(self) > 0:
lux.config.executor.compute_stats(self)
lux.config.executor.compute_dataset_metadata(self)
self._infer_structure()
self._metadata_fresh = True

def expire_recs(self):
def expire_recs(self) -> None:
"""
Expires and resets all recommendations
"""
self._recs_fresh = False
self._recommendation = {}
self._widget = None
self._rec_info = None
self._sampled = None
if lux.config.lazy_maintain:
self._recs_fresh = False
self._recommendation = {}
self._widget = None
self._rec_info = None
self._sampled = None

def expire_metadata(self):
def expire_metadata(self) -> None:
"""
Expire all saved metadata to trigger a recomputation the next time the data is required.
"""
self._metadata_fresh = False
self._data_type = None
self.unique_values = None
self.cardinality = None
self._min_max = None
self.pre_aggregated = None
if lux.config.lazy_maintain:
self._metadata_fresh = False
self._data_type = None
self.unique_values = None
self.cardinality = None
self._min_max = None
self.pre_aggregated = None

#####################
## Override Pandas ##
Expand Down Expand Up @@ -357,6 +367,7 @@ def maintain_recs(self, is_series="DataFrame"):
if lux.config.update_actions["flag"] == True:
self._recs_fresh = False
show_prev = False # flag indicating whether rec_df is showing previous df or current self

if self._prev is not None:
rec_df = self._prev
rec_df._message = Message()
Expand Down Expand Up @@ -394,8 +405,14 @@ def maintain_recs(self, is_series="DataFrame"):

rec_df._prev = None # reset _prev

# If lazy, check that recs has not yet been computed
lazy_but_not_computed = lux.config.lazy_maintain and (
not hasattr(rec_df, "_recs_fresh") or not rec_df._recs_fresh
)
eager = not lux.config.lazy_maintain

# Check that recs has not yet been computed
if not hasattr(rec_df, "_recs_fresh") or not rec_df._recs_fresh:
if lazy_but_not_computed or eager:
is_sql_tbl = lux.config.executor.name == "SQLExecutor"
rec_infolist = []
from lux.action.row_group import row_group
Expand Down Expand Up @@ -426,11 +443,13 @@ def maintain_recs(self, is_series="DataFrame"):
rec_df._recommendation[action_type] = vlist
rec_df._rec_info = rec_infolist
rec_df.show_all_column_vis()
self._widget = rec_df.render_widget()
if lux.config.render_widget:
self._widget = rec_df.render_widget()
# re-render widget for the current dataframe if previous rec is not recomputed
elif show_prev:
rec_df.show_all_column_vis()
self._widget = rec_df.render_widget()
if lux.config.render_widget:
self._widget = rec_df.render_widget()
self._recs_fresh = True

#######################################################
Expand Down
19 changes: 19 additions & 0 deletions lux/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ def to_pandas(self) -> pd.Series:

return lux.core.originalSeries(self, copy=False)

def unique(self):
"""
Overridden method for pd.Series.unique with cached results.
Return unique values of Series object.
Uniques are returned in order of appearance. Hash table-based unique,
therefore does NOT sort.
Returns
-------
ndarray or ExtensionArray
The unique values returned as a NumPy array.
See Also
--------
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.unique.html
"""
if self.unique_values and self.name in self.unique_values.keys():
return np.array(self.unique_values[self.name])
else:
return super(LuxSeries, self).unique()

def _ipython_display_(self):
from IPython.display import display
from IPython.display import clear_output
Expand Down
34 changes: 25 additions & 9 deletions lux/executor/Executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from lux.core.frame import LuxDataFrame
from lux.vis.VisList import VisList
from lux.vis.Vis import Vis
from lux.utils import utils


Expand All @@ -28,19 +29,38 @@ def __repr__(self):
return f"<Executor>"

@staticmethod
def execute(vis_collection: VisList, ldf):
def execute(vislist: VisList, ldf: LuxDataFrame, approx: bool = False):
"""
Given a VisList, fetch the data required to render the vis.
"""
return NotImplemented

@staticmethod
def execute_aggregate(vis, ldf):
def execute_aggregate(vis: Vis, ldf: LuxDataFrame):
"""
Aggregate data points on an axis for bar or line charts
"""
return NotImplemented

@staticmethod
def execute_binning(vis, ldf):
def execute_binning(ldf: LuxDataFrame, vis: Vis):
"""
Binning of data points for generating histograms
"""
return NotImplemented

@staticmethod
def execute_filter(vis, ldf):
def execute_filter(vis: Vis):
"""
Apply a Vis's filter to vis.data
"""
return NotImplemented

@staticmethod
def execute_2D_binning(vis: Vis):
"""
Apply 2D binning (heatmap) to vis.data
"""
return NotImplemented

@staticmethod
Expand All @@ -51,10 +71,6 @@ def compute_stats(self):
def compute_data_type(self):
return NotImplemented

# @staticmethod
# def compute_data_model(self):
# return NotImplemented

def mapping(self, rmap):
group_map = {}
for val in ["quantitative", "id", "nominal", "temporal", "geographical"]:
Expand Down
Loading