Skip to content

Commit

Permalink
fix(flink): implement an in-memory table formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored and cpcloud committed Sep 25, 2023
1 parent c12dfa4 commit 217a14b
Showing 1 changed file with 36 additions and 2 deletions.
38 changes: 36 additions & 2 deletions ibis/backends/flink/compiler/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,43 @@

import ibis.common.exceptions as com
import ibis.expr.operations as ops
from ibis.backends.base.sql.compiler import Compiler, Select, SelectBuilder
from ibis.backends.base.sql.compiler import (
Compiler,
Select,
SelectBuilder,
TableSetFormatter,
)
from ibis.backends.flink.translator import FlinkExprTranslator
from ibis.backends.flink.utils import translate_literal


class FlinkTableSetFormatter(TableSetFormatter):
def _format_in_memory_table(self, op):
names = op.schema.names
raw_rows = []
for row in op.data.to_frame().itertuples(index=False):
raw_row = []
for val, name in zip(row, names):
lit = ops.Literal(val, dtype=op.schema[name])
raw_row.append(self._translate(lit))
raw_rows.append(", ".join(raw_row))
rows = ", ".join(f"({raw_row})" for raw_row in raw_rows)
return f"(VALUES {rows})"

def _format_table(self, op) -> str:
result = super()._format_table(op)

ref_op = op
if isinstance(op, ops.SelfReference):
ref_op = op.table

if isinstance(ref_op, ops.InMemoryTable):
names = op.schema.names
result += f"({', '.join(self._quote_identifier(name) for name in names)})"

return result


class FlinkSelectBuilder(SelectBuilder):
def _convert_group_by(self, exprs):
return exprs
Expand Down Expand Up @@ -39,10 +71,12 @@ def format_group_by(self) -> str:


class FlinkCompiler(Compiler):
translator_class = FlinkExprTranslator
table_set_formatter_class = FlinkTableSetFormatter
select_builder_class = FlinkSelectBuilder
select_class = FlinkSelect

cheap_in_memory_tables = True
translator_class = FlinkExprTranslator


def translate(op: ops.TableNode) -> str:
Expand Down

1 comment on commit 217a14b

@ibis-squawk-bot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 3.

Benchmark suite Current: 217a14b Previous: b37804a Ratio
ibis/tests/benchmarks/test_benchmarks.py::test_compile[large-bigquery] 26.44461332496885 iter/sec (stddev: 0.000539286480283364) 146.4406797920701 iter/sec (stddev: 0.00006932716190392247) 5.54
ibis/tests/benchmarks/test_benchmarks.py::test_compile[medium-bigquery] 54.502609978313394 iter/sec (stddev: 0.01750155161780644) 603.5295570002766 iter/sec (stddev: 0.000035398257191080955) 11.07
ibis/tests/benchmarks/test_benchmarks.py::test_compile[small-bigquery] 1432.2640785388771 iter/sec (stddev: 0.00005888029289146269) 10602.847663506958 iter/sec (stddev: 0.00005834248372791223) 7.40
ibis/tests/benchmarks/test_benchmarks.py::test_compile_with_drops[bigquery] 35.8898753368565 iter/sec (stddev: 0.0003842688639438991) 130.02477424437095 iter/sec (stddev: 0.000247510166318326) 3.62

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.