Skip to content

Commit

Permalink
feat(api): make cache evaluate only once per session per expression
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Mar 9, 2023
1 parent 2578334 commit 5a8ffe9
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 84 deletions.
51 changes: 40 additions & 11 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
import sys
import urllib.parse
from collections import Counter
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand All @@ -20,6 +21,8 @@
MutableMapping,
)

from bidict import MutableBidirectionalMapping, bidict

import ibis
import ibis.common.exceptions as exc
import ibis.config
Expand Down Expand Up @@ -456,6 +459,12 @@ class BaseBackend(abc.ABC, _FileIOHandler):
def __init__(self, *args, **kwargs):
self._con_args: tuple[Any] = args
self._con_kwargs: dict[str, Any] = kwargs
# expression cache
self._query_cache: MutableBidirectionalMapping[
ops.TableNode, ops.PhysicalTable
] = bidict()

self._refs = Counter()

def __getstate__(self):
return dict(
Expand Down Expand Up @@ -889,8 +898,10 @@ def has_operation(cls, operation: type[ops.Value]) -> bool:
f"{cls.name} backend has not implemented `has_operation` API"
)

def _cache(self, expr):
"""Cache the provided expression. All subsequent operations on the returned expression will be performed on the cached data.
def _cached(self, expr):
"""Cache the provided expression.
All subsequent operations on the returned expression will be performed on the cached data.
Parameters
----------
Expand All @@ -903,22 +914,40 @@ def _cache(self, expr):
Cached table
"""
raise NotImplementedError(
f"{self.name} backend has not implemented `cache` API"
)

def _release_cache(self, expr):
op = expr.op()
if (result := self._query_cache.get(op)) is None:
name = util.generate_unique_table_name("cache")
self._load_into_cache(name, expr)
self._query_cache[op] = result = self.table(name, expr.schema()).op()
self._refs[op] += 1
return ir.CachedTable(result)

def _release_cached(self, expr):
"""Releases the provided cached expression.
Parameters
----------
expr
Cached expression to release
"""
raise NotImplementedError(
f"{self.name} backend has not implemented `release_cache` API"
)
op = expr.op()
# we need to remove the expression representing the temp table as well
# as the expression that was used to create the temp table
#
# bidict automatically handles this for us; without it we'd have to
# do to the bookkeeping ourselves with two dicts
if (key := self._query_cache.inverse.get(op)) is None:
raise exc.IbisError(
"This expression has already been released. Did you call "
"`.release()` twice on the same expression?"
)

self._refs[key] -= 1

if not self._refs[key]:
del self._query_cache[key]
del self._refs[key]
self._clean_up_cached_table(op)


@functools.lru_cache(maxsize=None)
Expand Down
15 changes: 5 additions & 10 deletions ibis/backends/base/sql/alchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,13 +672,8 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
"""Return an ibis Schema from a backend-specific SQL string."""
return sch.Schema.from_tuples(self._metadata(query))

def _cache(self, expr):
persisted_table_name = util.generate_unique_table_name("cache")
self.create_table(persisted_table_name, expr, schema=expr.schema(), temp=True)
return self.table(persisted_table_name)

def _release_cache(self, expr):
if isinstance(expr._arg, AlchemyTable):
self.drop_table(expr._arg.name)
else:
raise NotImplementedError(f"{expr.arg} is not releaseable.")
def _load_into_cache(self, name, expr):
self.create_table(name, expr, schema=expr.schema(), temp=True)

def _clean_up_cached_table(self, op):
self.drop_table(op.name)
15 changes: 4 additions & 11 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import ibis.config
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends.dask.client import DaskDatabase, DaskTable, ibis_schema_to_dask
from ibis.backends.dask.core import execute_and_reset
from ibis.backends.pandas import BasePandasBackend
Expand Down Expand Up @@ -128,14 +127,8 @@ def _convert_schema(schema: sch.Schema):
def _convert_object(cls, obj: dd.DataFrame) -> dd.DataFrame:
return obj

def _cache(self, expr):
persisted_table_name = util.generate_unique_table_name("cache")
df = self.compile(expr).persist()
self.load_data(persisted_table_name, df)
return self.table(persisted_table_name)
def _load_into_cache(self, name, expr):
self.load_data(name, self.compile(expr).persist())

def _release_cache(self, expr):
if isinstance(expr._arg, DaskTable):
del self.dictionary[expr._arg.name]
else:
raise NotImplementedError(f"{expr.arg} is not releasable.")
def _clean_up_cached_table(self, op):
del self.dictionary[op.name]
9 changes: 5 additions & 4 deletions ibis/backends/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,9 @@ def execute(self, query, params=None, limit='default', **kwargs):

return execute_and_reset(node, params=params, **kwargs)

def _cache(self, expr):
return expr
def _cached(self, expr):
"""No-op. The expression is already in memory."""
return ir.CachedTable(expr.op())

def _release_cache(self, expr):
return
def _release_cached(self, _):
"""No-op."""
16 changes: 4 additions & 12 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends.base import BaseBackend
from ibis.backends.polars.compiler import translate
from ibis.expr.operations import DatabaseTable
from ibis.util import normalize_filename

if TYPE_CHECKING:
Expand Down Expand Up @@ -353,14 +351,8 @@ def to_pyarrow_batches(
table = self._to_pyarrow_table(expr, params=params, limit=limit, **kwargs)
return table.to_reader(chunk_size)

def _cache(self, expr):
persisted_table_name = util.generate_unique_table_name("cache")
lf = self.compile(expr).cache()
self.load_data(persisted_table_name, lf)
return self.table(persisted_table_name, expr.schema)
def _load_into_cache(self, name, expr):
self.load_data(name, self.compile(expr).cache())

def _release_cache(self, expr):
if isinstance(expr._arg, DatabaseTable):
del self._tables[expr._arg.name]
else:
raise NotImplementedError(f"{expr._arg} is not releasable.")
def _clean_up_cached_table(self, op):
del self._tables[op.name]
Loading

0 comments on commit 5a8ffe9

Please sign in to comment.