Skip to content

Commit

Permalink
fix(datafusion): support computed group by when the aggregation is co…
Browse files Browse the repository at this point in the history
…unt distinct
  • Loading branch information
cpcloud authored and gforsyth committed Dec 11, 2023
1 parent d07be85 commit 18bdb7e
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions ibis/backends/datafusion/compiler/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,37 @@ def _limit(op: ops.Limit, *, table, n, offset, **_):
def _aggregation(
op: ops.Aggregation, *, table, metrics, by, having, predicates, sort_keys, **_
):
selections = (by + metrics) or (STAR,)
if by:
# datafusion doesn't support count distinct aggregations alongside
# computed grouping keys so create a projection of the key and all
# existing columns first, followed by the usual group by
#
# analogous to a user calling mutate -> group_by
by_names = frozenset(b.alias_or_name for b in by)
cols = [
sg.column(
name,
table=sg.to_identifier(table.alias_or_name, quoted=True),
quoted=True,
)
for name in op.table.schema.keys() - by_names
]
table = sg.select(*cols, *by).from_(table).subquery()

# datafusion lower cases all column names internally unless quoted so
# quoted=True is required here for correctness
by_names_quoted = tuple(
sg.column(b.alias_or_name, table=getattr(b, "table", None), quoted=True)
for b in by
)
selections = by_names_quoted + metrics
else:
selections = metrics or (STAR,)

sel = sg.select(*selections).from_(table)

if by:
sel = sel.group_by(
*(key.this if isinstance(key, sg.exp.Alias) else key for key in by)
)
sel = sel.group_by(*by_names_quoted)

if predicates:
sel = sel.where(*predicates)
Expand Down

0 comments on commit 18bdb7e

Please sign in to comment.