Skip to content

Commit

Permalink
refactor: generate uuid-based names for temp tables
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Apr 1, 2023
1 parent dd55beb commit a1164df
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 71 deletions.
2 changes: 1 addition & 1 deletion ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ def __init__(self, *args, **kwargs):
populate=self._load_into_cache,
lookup=lambda name: self.table(name).op(),
finalize=self._clean_up_cached_table,
generate_name=functools.partial(util.generate_unique_table_name, "cache"),
generate_name=functools.partial(util.gen_name, "cache"),
key=lambda expr: expr.op(),
)

Expand Down
12 changes: 3 additions & 9 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import itertools
import re
from functools import lru_cache
from pathlib import Path
Expand All @@ -15,7 +14,7 @@
import ibis.expr.types as ir
from ibis.backends.base import BaseBackend
from ibis.backends.datafusion.compiler import translate
from ibis.util import normalize_filename
from ibis.util import gen_name, normalize_filename

try:
from datafusion import ExecutionContext as SessionContext
Expand All @@ -24,11 +23,6 @@

import datafusion

# counters for in-memory, parquet, and csv reads
# used if no table name is specified
pa_n = itertools.count(0)
csv_n = itertools.count(0)


class Backend(BaseBackend):
name = 'datafusion'
Expand Down Expand Up @@ -169,7 +163,7 @@ def read_csv(
The just-registered table
"""
path = normalize_filename(path)
table_name = table_name or f"ibis_read_csv_{next(csv_n)}"
table_name = table_name or gen_name("read_csv")
# Our other backends support overwriting views / tables when reregistering
self._context.deregister_table(table_name)
self._context.register_csv(table_name, path, **kwargs)
Expand All @@ -196,7 +190,7 @@ def read_parquet(
The just-registered table
"""
path = normalize_filename(path)
table_name = table_name or f"ibis_read_parquet_{next(pa_n)}"
table_name = table_name or gen_name("read_parquet")
# Our other backends support overwriting views / tables when reregistering
self._context.deregister_table(table_name)
self._context.register_parquet(table_name, path, **kwargs)
Expand Down
16 changes: 4 additions & 12 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import ast
import itertools
import os
import warnings
import weakref
Expand All @@ -30,13 +29,6 @@

import ibis.expr.operations as ops

# counters for in-memory, parquet, csv, and json reads
# used if no table name is specified
pd_n = itertools.count(0)
pa_n = itertools.count(0)
csv_n = itertools.count(0)
json_n = itertools.count(0)


def normalize_filenames(source_list):
# Promote to list
Expand Down Expand Up @@ -276,7 +268,7 @@ def read_json(
f"`read_json` requires duckdb >= 0.7.0, duckdb {version} is installed"
)
if not table_name:
table_name = f"ibis_read_json_{next(json_n)}"
table_name = util.gen_name("read_json")

source = sa.select(sa.literal_column("*")).select_from(
sa.func.read_json_auto(
Expand Down Expand Up @@ -318,7 +310,7 @@ def read_csv(
source_list = normalize_filenames(source_list)

if not table_name:
table_name = f"ibis_read_csv_{next(csv_n)}"
table_name = util.gen_name("read_csv")

# auto_detect and columns collide, so we set auto_detect=True
# unless COLUMNS has been specified
Expand Down Expand Up @@ -362,7 +354,7 @@ def read_parquet(
"""
source_list = normalize_filenames(source_list)

table_name = table_name or f"ibis_read_parquet_{next(pa_n)}"
table_name = table_name or util.gen_name("read_parquet")

# Default to using the native duckdb parquet reader
# If that fails because of auth issues, fall back to ingesting via
Expand Down Expand Up @@ -457,7 +449,7 @@ def _clean_up_string_columns(
}
)

table_name = table_name or f"ibis_read_in_memory_{next(pd_n)}"
table_name = table_name or util.gen_name("read_in_memory")
with self.begin() as con:
con.connection.register(table_name, _clean_up_string_columns(dataframe))

Expand Down
12 changes: 4 additions & 8 deletions ibis/backends/duckdb/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@


class TestConf(BackendTest, RoundAwayFromZero):
def __init__(
self, data_directory: Path, extension_directory: Path | None = None
) -> None:
self.connection = self.connect(
data_directory, extension_directory=str(extension_directory)
)
def __init__(self, data_directory: Path, **kwargs: Any) -> None:
self.connection = self.connect(data_directory, **kwargs)

script_dir = data_directory.parent

Expand Down Expand Up @@ -52,11 +48,11 @@ def _load_data(data_dir, script_dir, **_: Any) -> None:
return TestConf(data_directory=data_dir)

@staticmethod
def connect(data_directory: Path, **kwargs) -> BaseBackend:
def connect(data_directory: Path, **kwargs: Any) -> BaseBackend:
pytest.importorskip("duckdb")
return ibis.duckdb.connect(**kwargs) # type: ignore


@pytest.fixture
def con(data_directory, tmp_path: Path):
return TestConf(data_directory, extension_directory=tmp_path).connection
return TestConf(data_directory, extension_directory=str(tmp_path)).connection
15 changes: 4 additions & 11 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import itertools
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, MutableMapping
Expand All @@ -15,17 +14,11 @@
import ibis.expr.types as ir
from ibis.backends.base import BaseBackend
from ibis.backends.polars.compiler import translate
from ibis.util import deprecated, normalize_filename
from ibis.util import deprecated, gen_name, normalize_filename

if TYPE_CHECKING:
import pandas as pd

# counters for in-memory, parquet, and csv reads
# used if no table name is specified
pd_n = itertools.count(0)
pa_n = itertools.count(0)
csv_n = itertools.count(0)


class Backend(BaseBackend):
name = "polars"
Expand Down Expand Up @@ -154,7 +147,7 @@ def read_csv(
The just-registered table
"""
path = normalize_filename(path)
table_name = table_name or f"ibis_read_csv_{next(csv_n)}"
table_name = table_name or gen_name("read_csv")
try:
self._tables[table_name] = pl.scan_csv(path, **kwargs)
except pl.exceptions.ComputeError:
Expand Down Expand Up @@ -184,7 +177,7 @@ def read_pandas(
ir.Table
The just-registered table
"""
table_name = table_name or f"ibis_read_in_memory_{next(pd_n)}"
table_name = table_name or gen_name("read_in_memory")
self._tables[table_name] = pl.from_pandas(source, **kwargs).lazy()
return self.table(table_name)

Expand All @@ -211,7 +204,7 @@ def read_parquet(
The just-registered table
"""
path = normalize_filename(path)
table_name = table_name or f"ibis_read_parquet_{next(pa_n)}"
table_name = table_name or gen_name("read_parquet")
self._tables[table_name] = pl.scan_parquet(path, **kwargs)
return self.table(table_name)

Expand Down
8 changes: 2 additions & 6 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import itertools
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -39,9 +38,6 @@
'escape': '"',
}

pa_n = itertools.count(0)
csv_n = itertools.count(0)


def normalize_filenames(source_list):
# Promote to list
Expand Down Expand Up @@ -613,7 +609,7 @@ def read_parquet(
"""
source = util.normalize_filename(source)
spark_df = self._session.read.parquet(source, **kwargs)
table_name = table_name or f"ibis_read_parquet_{next(pa_n)}"
table_name = table_name or util.gen_name("read_parquet")

spark_df.createOrReplaceTempView(table_name)
return self.table(table_name)
Expand Down Expand Up @@ -645,7 +641,7 @@ def read_csv(
"""
source_list = normalize_filenames(source_list)
spark_df = self._session.read.csv(source_list, **kwargs)
table_name = table_name or f"ibis_read_csv_{next(csv_n)}"
table_name = table_name or util.gen_name("read_csv")

spark_df.createOrReplaceTempView(table_name)
return self.table(table_name)
Expand Down
26 changes: 14 additions & 12 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_register_csv(con, data_directory, fname, in_table_name, out_table_name)
with pushd(data_directory):
table = con.register(fname, table_name=in_table_name)

assert any(t.startswith(out_table_name) for t in con.list_tables())
assert any(out_table_name in t for t in con.list_tables())
if con.name != "datafusion":
table.count().execute()

Expand Down Expand Up @@ -167,14 +167,16 @@ def read_table(path: Path) -> Iterator[tuple[str, pa.Table]]:
@pytest.mark.parametrize(
("fname", "in_table_name", "out_table_name"),
[
pytest.param(
"parquet://functional_alltypes.parquet",
None,
"ibis_read_parquet",
param(
"parquet://functional_alltypes.parquet", None, "ibis_read_parquet", id="url"
),
param("functional_alltypes.parquet", "funk_all", "funk_all", id="basename"),
param(
"parquet://functional_alltypes.parq", "funk_all", "funk_all", id="url_parq"
),
param(
"parquet://functional_alltypes", None, "ibis_read_parquet", id="url_no_ext"
),
("functional_alltypes.parquet", "funk_all", "funk_all"),
pytest.param("parquet://functional_alltypes.parq", "funk_all", "funk_all"),
("parquet://functional_alltypes", None, "ibis_read_parquet"),
],
)
@pytest.mark.notyet(
Expand Down Expand Up @@ -205,7 +207,7 @@ def test_register_parquet(
with pushd(tmp_path):
table = con.register(f"parquet://{fname.name}", table_name=in_table_name)

assert any(t.startswith(out_table_name) for t in con.list_tables())
assert any(out_table_name in t for t in con.list_tables())

if con.name != "datafusion":
table.count().execute()
Expand Down Expand Up @@ -246,7 +248,7 @@ def test_register_iterator_parquet(
table_name=None,
)

assert any(t.startswith("ibis_read_parquet") for t in con.list_tables())
assert any("ibis_read_parquet" in t for t in con.list_tables())

assert table.count().execute()

Expand Down Expand Up @@ -432,7 +434,7 @@ def test_read_parquet(
fname = str(Path(fname).absolute())
table = con.read_parquet(fname, table_name=in_table_name)

assert any(t.startswith(out_table_name) for t in con.list_tables())
assert any(out_table_name in t for t in con.list_tables())

if con.name != "datafusion":
table.count().execute()
Expand Down Expand Up @@ -472,6 +474,6 @@ def test_read_csv(con, data_directory, fname, in_table_name, out_table_name):
fname = str(Path(fname).absolute())
table = con.read_csv(fname, table_name=in_table_name)

assert any(t.startswith(out_table_name) for t in con.list_tables())
assert any(out_table_name in t for t in con.list_tables())
if con.name != "datafusion":
table.count().execute()
12 changes: 2 additions & 10 deletions ibis/expr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,7 @@ def _memtable_from_pyarrow_table(
assert schema is None, "if `columns` is not `None` then `schema` must be `None`"
schema = sch.Schema(dict(zip(columns, sch.infer(data).values())))
return ops.InMemoryTable(
name=(
name
if name is not None
else util.generate_unique_table_name("pyarrow_memtable")
),
name=name if name is not None else util.gen_name("pyarrow_memtable"),
schema=sch.infer(data) if schema is None else schema,
data=PyArrowTableProxy(data),
).to_expr()
Expand Down Expand Up @@ -451,11 +447,7 @@ def _memtable_from_dataframe(
)
df = df.rename(columns=dict(zip(cols, newcols)))
op = ops.InMemoryTable(
name=(
name
if name is not None
else util.generate_unique_table_name("pandas_memtable")
),
name=name if name is not None else util.gen_name("pandas_memtable"),
schema=sch.infer(df) if schema is None else schema,
data=DataFrameProxy(df),
)
Expand Down
4 changes: 2 additions & 2 deletions ibis/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,6 @@ def _absolufy_paths(name):
return source


def generate_unique_table_name(namespace: str) -> str:
"""Creates case-insensitive uuid4 unique table name."""
def gen_name(namespace: str) -> str:
"""Create a case-insensitive uuid4 unique table name."""
return f"_ibis_{namespace}_{np.base_repr(uuid.uuid4().int, 36)}".lower()

0 comments on commit a1164df

Please sign in to comment.