Skip to content

Commit

Permalink
feat(api): add TableUnnest operation to support cross-join unnest s…
Browse files Browse the repository at this point in the history
…emantics as well as `offset` (#9423)
  • Loading branch information
cpcloud authored Jul 1, 2024
1 parent c73bcf0 commit 3352a84
Show file tree
Hide file tree
Showing 12 changed files with 686 additions and 8 deletions.
44 changes: 44 additions & 0 deletions ibis/backends/bigquery/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,3 +690,47 @@ def visit_DropColumns(self, op, *, parent, columns_to_drop):
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)

def visit_TableUnnest(
self, op, *, parent, column, offset: str | None, keep_empty: bool
):
quoted = self.quoted

column_alias = sg.to_identifier(
util.gen_name("table_unnest_column"), quoted=quoted
)

selcols = []

table = sg.to_identifier(parent.alias_or_name, quoted=quoted)

opname = op.column.name
overlaps_with_parent = opname in op.parent.schema
computed_column = column_alias.as_(opname, quoted=quoted)

# replace the existing column if the unnested column hasn't been
# renamed
#
# e.g., table.unnest("x")
if overlaps_with_parent:
selcols.append(
sge.Column(this=sge.Star(replace=[computed_column]), table=table)
)
else:
selcols.append(sge.Column(this=STAR, table=table))
selcols.append(computed_column)

if offset is not None:
offset = sg.to_identifier(offset, quoted=quoted)
selcols.append(offset)

unnest = sge.Unnest(
expressions=[column],
alias=sge.TableAlias(columns=[column_alias]),
offset=offset,
)
return (
sg.select(*selcols)
.from_(parent)
.join(unnest, join_type="CROSS" if not keep_empty else "LEFT")
)
57 changes: 57 additions & 0 deletions ibis/backends/clickhouse/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,3 +648,60 @@ def visit_DropColumns(self, op, *, parent, columns_to_drop):
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)

def visit_TableUnnest(
self, op, *, parent, column, offset: str | None, keep_empty: bool
):
quoted = self.quoted

column_alias = sg.to_identifier(
util.gen_name("table_unnest_column"), quoted=quoted
)

table = sg.to_identifier(parent.alias_or_name, quoted=quoted)

selcols = []

opname = op.column.name
overlaps_with_parent = opname in op.parent.schema
computed_column = column_alias.as_(opname, quoted=quoted)

if offset is not None:
if overlaps_with_parent:
selcols.append(
sge.Column(this=sge.Star(replace=[computed_column]), table=table)
)
else:
selcols.append(sge.Column(this=STAR, table=table))
selcols.append(computed_column)

offset = sg.to_identifier(offset, quoted=quoted)
selcols.append(offset)
elif overlaps_with_parent:
selcols.append(
sge.Column(this=sge.Star(replace=[computed_column]), table=table)
)
else:
selcols.append(sge.Column(this=STAR, table=table))
selcols.append(computed_column)

select = (
sg.select(*selcols)
.from_(parent)
.join(
sge.Join(
this=column.as_(column_alias, quoted=quoted),
kind="ARRAY",
side=None if not keep_empty else "LEFT",
)
)
)

if offset is not None:
param = sg.to_identifier(util.gen_name("arr_enum"))
func = sge.Lambda(this=param - 1, expressions=[param])
return select.join(
self.f.arrayMap(func, self.f.arrayEnumerate(column_alias)).as_(offset)
)

return select
53 changes: 53 additions & 0 deletions ibis/backends/duckdb/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ibis.backends.sql.compiler import NULL, STAR, AggGen, SQLGlotCompiler
from ibis.backends.sql.datatypes import DuckDBType
from ibis.backends.sql.rewrites import exclude_nulls_from_array_collect
from ibis.util import gen_name

_INTERVAL_SUFFIXES = {
"ms": "milliseconds",
Expand Down Expand Up @@ -547,3 +548,55 @@ def visit_DropColumns(self, op, *, parent, columns_to_drop):
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)

def visit_TableUnnest(
self, op, *, parent, column, offset: str | None, keep_empty: bool
):
quoted = self.quoted

column_alias = sg.to_identifier(gen_name("table_unnest_column"), quoted=quoted)

opname = op.column.name
overlaps_with_parent = opname in op.parent.schema
computed_column = column_alias.as_(opname, quoted=quoted)

selcols = []

table = sg.to_identifier(parent.alias_or_name, quoted=quoted)

if offset is not None:
# TODO: clean this up once WITH ORDINALITY is supported in DuckDB
# no need for struct_extract once that's upstream
column = self.f.list_zip(column, self.f.range(self.f.len(column)))
extract = self.f.struct_extract(column_alias, 1).as_(opname, quoted=quoted)

if overlaps_with_parent:
replace = sge.Column(this=sge.Star(replace=[extract]), table=table)
selcols.append(replace)
else:
selcols.append(sge.Column(this=STAR, table=table))
selcols.append(extract)

selcols.append(
self.f.struct_extract(column_alias, 2).as_(offset, quoted=quoted)
)
elif overlaps_with_parent:
selcols.append(
sge.Column(this=sge.Star(replace=[computed_column]), table=table)
)
else:
selcols.append(sge.Column(this=STAR, table=table))
selcols.append(computed_column)

unnest = sge.Unnest(
expressions=[column],
alias=sge.TableAlias(
this=sg.to_identifier(gen_name("table_unnest"), quoted=quoted),
columns=[column_alias],
),
)
return (
sg.select(*selcols)
.from_(parent)
.join(unnest, join_type="CROSS" if not keep_empty else "LEFT")
)
57 changes: 57 additions & 0 deletions ibis/backends/postgres/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ibis.backends.sql.datatypes import PostgresType
from ibis.backends.sql.dialects import Postgres
from ibis.backends.sql.rewrites import exclude_nulls_from_array_collect
from ibis.util import gen_name


class PostgresUDFNode(ops.Value):
Expand Down Expand Up @@ -611,3 +612,59 @@ def visit_Hash(self, op, *, arg):
f"Hash({arg_dtype!r}) operation is not supported in the "
f"{self.dialect} backend"
)

def visit_TableUnnest(
self, op, *, parent, column, offset: str | None, keep_empty: bool
):
quoted = self.quoted

column_alias = sg.to_identifier(gen_name("table_unnest_column"), quoted=quoted)

parent_alias = parent.alias_or_name

opname = op.column.name
parent_schema = op.parent.schema
overlaps_with_parent = opname in parent_schema
computed_column = column_alias.as_(opname, quoted=quoted)

selcols = []

if overlaps_with_parent:
column_alias_or_name = column.alias_or_name
selcols.extend(
sg.column(col, table=parent_alias, quoted=quoted)
if col != column_alias_or_name
else computed_column
for col in parent_schema.names
)
else:
selcols.append(
sge.Column(
this=STAR, table=sg.to_identifier(parent_alias, quoted=quoted)
)
)
selcols.append(computed_column)

if offset is not None:
offset_name = offset
offset = sg.to_identifier(offset_name, quoted=quoted)
selcols.append((offset - 1).as_(offset_name, quoted=quoted))

unnest = sge.Unnest(
expressions=[column],
alias=sge.TableAlias(
this=sg.to_identifier(gen_name("table_unnest"), quoted=quoted),
columns=[column_alias],
),
offset=offset,
)

return (
sg.select(*selcols)
.from_(parent)
.join(
unnest,
on=None if not keep_empty else sge.convert(True),
join_type="CROSS" if not keep_empty else "LEFT",
)
)
65 changes: 65 additions & 0 deletions ibis/backends/pyspark/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,68 @@ def visit_HexDigest(self, op, *, arg, how):
return self.f.sha2(arg, int(how[-3:]))
else:
raise NotImplementedError(f"No available hashing function for {how}")

def visit_TableUnnest(
self, op, *, parent, column, offset: str | None, keep_empty: bool
):
quoted = self.quoted

column_alias = sg.to_identifier(gen_name("table_unnest_column"), quoted=quoted)

opname = op.column.name
parent_schema = op.parent.schema
overlaps_with_parent = opname in parent_schema
computed_column = column_alias.as_(opname, quoted=quoted)

parent_alias = parent.alias_or_name

selcols = []

if overlaps_with_parent:
column_alias_or_name = column.alias_or_name
selcols.extend(
sg.column(col, table=parent_alias, quoted=quoted)
if col != column_alias_or_name
else computed_column
for col in parent_schema.names
)
else:
selcols.append(
sge.Column(
this=STAR, table=sg.to_identifier(parent_alias, quoted=quoted)
)
)
selcols.append(computed_column)

alias_columns = []

if offset is not None:
offset = sg.column(offset, quoted=quoted)
selcols.append(offset)
alias_columns.append(offset)

alias_columns.append(column_alias)

# four possible functions
#
# explode: unnest
# explode_outer: unnest preserving empties and nulls
# posexplode: unnest with index
# posexplode_outer: unnest with index preserving empties and nulls
funcname = (
("pos" if offset is not None else "")
+ "explode"
+ ("_outer" if keep_empty else "")
)

return (
sg.select(*selcols)
.from_(parent)
.lateral(
sge.Lateral(
this=self.f[funcname](column),
view=True,
alias=sge.TableAlias(columns=alias_columns),
)
)
)
67 changes: 67 additions & 0 deletions ibis/backends/snowflake/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,70 @@ def visit_DropColumns(self, op, *, parent, columns_to_drop):
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)

def visit_TableUnnest(
self, op, *, parent, column, offset: str | None, keep_empty: bool
):
quoted = self.quoted

column_alias = sg.to_identifier(
util.gen_name("table_unnest_column"), quoted=quoted
)

sep = sge.convert(util.guid())
null_sentinel = sge.convert(util.guid())

table = sg.to_identifier(parent.alias_or_name, quoted=quoted)

selcols = []

opcol = op.column
opname = opcol.name
overlaps_with_parent = opname in op.parent.schema
computed_column = self.cast(
self.f.nullif(column_alias, null_sentinel), opcol.dtype.value_type
).as_(opname, quoted=quoted)

if overlaps_with_parent:
selcols.append(
sge.Column(this=sge.Star(replace=[computed_column]), table=table)
)
else:
selcols.append(sge.Column(this=STAR, table=table))
selcols.append(computed_column)

if offset is not None:
offset = sg.to_identifier(offset, quoted=quoted)
selcols.append(offset)

alias = sge.TableAlias(
this=sg.to_identifier(util.gen_name("table_unnest"), quoted=quoted),
columns=[column_alias],
)

# there has to be a better way
param = sg.to_identifier(util.gen_name("table_unnest_param"))
column = self.f.transform(
column,
sge.Lambda(
this=self.f.coalesce(self.cast(param, dt.string), null_sentinel),
expressions=[param],
),
)
empty_array = self.f.array()
split = self.f.coalesce(
self.f.nullif(
self.f.split(
self.f.array_to_string(self.f.nullif(column, empty_array), sep), sep
),
empty_array,
),
self.f.array(null_sentinel),
)

unnest = sge.Unnest(expressions=[split], alias=alias, offset=offset)
return (
sg.select(*selcols)
.from_(parent)
.join(unnest, join_type="CROSS" if not keep_empty else "LEFT")
)
Loading

0 comments on commit 3352a84

Please sign in to comment.