Skip to content

Commit

Permalink
test(dev-tools): move flink imports to call site to avoid missing imp…
Browse files Browse the repository at this point in the history
…ort errors when not installed
  • Loading branch information
cpcloud authored and gforsyth committed Sep 28, 2023
1 parent 1dea9ca commit be7e53a
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 6 deletions.
6 changes: 4 additions & 2 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
from functools import lru_cache
from typing import TYPE_CHECKING, Any

import pyflink.version
import sqlglot as sg
from pyflink.table.types import create_arrow_schema

import ibis.common.exceptions as exc
import ibis.expr.operations as ops
Expand Down Expand Up @@ -217,6 +215,8 @@ def get_schema(
sch.Schema
Ibis schema
"""
from pyflink.table.types import create_arrow_schema

qualified_name = self._fully_qualified_name(table_name, catalog, database)
table = self._table_env.from_path(qualified_name)
schema = table.get_schema()
Expand All @@ -226,6 +226,8 @@ def get_schema(

@property
def version(self) -> str:
import pyflink.version

return pyflink.version.__version__

def compile(
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/compiler/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
TableSetFormatter,
)
from ibis.backends.flink.translator import FlinkExprTranslator
from ibis.backends.flink.utils import translate_literal


class FlinkTableSetFormatter(TableSetFormatter):
Expand Down Expand Up @@ -90,6 +89,8 @@ def translate_op(op: ops.TableNode) -> str:

@translate_op.register(ops.Literal)
def _literal(op: ops.Literal) -> str:
from ibis.backends.flink.utils import translate_literal

return translate_literal(op)


Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/flink/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
is_fully_qualified,
)
from ibis.backends.base.sql.registry import quote_identifier, type_to_sql_string
from ibis.backends.flink.utils import translate_literal

if TYPE_CHECKING:
from ibis.expr.streaming import Watermark
Expand All @@ -39,6 +38,8 @@ def type_to_flink_sql_string(tval):


def _format_watermark_strategy(watermark: Watermark) -> str:
from ibis.backends.flink.utils import translate_literal

if watermark.allowed_delay is None:
return watermark.time_col
return f"{watermark.time_col} - {translate_literal(watermark.allowed_delay.op())}"
Expand Down
9 changes: 8 additions & 1 deletion ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ibis.backends.base.sql.registry import (
operation_registry as base_operation_registry,
)
from ibis.backends.flink.utils import _to_pyflink_types, translate_literal
from ibis.common.temporal import TimestampUnit

if TYPE_CHECKING:
Expand All @@ -18,11 +17,15 @@


def _zeroifnull(translator: ExprTranslator, op: ops.Literal) -> str:
from ibis.backends.flink.utils import translate_literal

casted = translate_literal(ops.Literal("0", dtype=op.dtype))
return f"COALESCE({translator.translate(op.arg)}, {casted})"


def _nullifzero(translator: ExprTranslator, op: ops.Literal) -> str:
from ibis.backends.flink.utils import translate_literal

casted = translate_literal(ops.Literal("0", dtype=op.dtype))
return f"NULLIF({translator.translate(op.arg)}, {casted})"

Expand All @@ -45,6 +48,8 @@ def extract_field_formatter(translator: ExprTranslator, op: ops.Node) -> str:


def _literal(translator: ExprTranslator, op: ops.Literal) -> str:
from ibis.backends.flink.utils import translate_literal

return translate_literal(op)


Expand Down Expand Up @@ -194,6 +199,8 @@ def _window(translator: ExprTranslator, op: ops.Node) -> str:


def _clip(translator: ExprTranslator, op: ops.Node) -> str:
from ibis.backends.flink.utils import _to_pyflink_types

arg = translator.translate(op.arg)

if op.upper is not None:
Expand Down
2 changes: 1 addition & 1 deletion ibis/tests/benchmarks/test_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def test_compile(benchmark, module, expr_fn, t, base, large_expr):
expr = expr_fn(t, base, large_expr)
try:
benchmark(mod.compile, expr)
except sa.exc.NoSuchModuleError as e:
except (sa.exc.NoSuchModuleError, ImportError) as e: # delayed imports
pytest.skip(str(e))


Expand Down

0 comments on commit be7e53a

Please sign in to comment.