Skip to content

Commit

Permalink
refactor(flink): expose raw_sql over _exec_sql
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored and cpcloud committed Jan 26, 2024
1 parent 17f636e commit 0b66b94
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def do_connect(self, table_env: TableEnvironment) -> None:
"""
self._table_env = table_env

def _exec_sql(self, query: str) -> TableResult:
def raw_sql(self, query: str) -> TableResult:
return self._table_env.execute_sql(query)

def list_databases(self, like: str | None = None) -> list[str]:
Expand Down Expand Up @@ -99,7 +99,7 @@ def create_database(
statement = CreateDatabase(
name=name, db_properties=db_properties, catalog=catalog, can_exist=force
)
self._exec_sql(statement.compile())
self.raw_sql(statement.compile())

def drop_database(
self, name: str, catalog: str | None = None, force: bool = False
Expand All @@ -116,7 +116,7 @@ def drop_database(
If `False`, an exception is raised if the database does not exist.
"""
statement = DropDatabase(name=name, catalog=catalog, must_exist=not force)
self._exec_sql(statement.compile())
self.raw_sql(statement.compile())

def list_tables(
self,
Expand Down Expand Up @@ -456,7 +456,7 @@ def create_table(
catalog=catalog,
)
sql = statement.compile()
self._exec_sql(sql)
self.raw_sql(sql)

return self.table(name, database=database, catalog=catalog)

Expand Down Expand Up @@ -491,7 +491,7 @@ def drop_table(
must_exist=not force,
temporary=temp,
)
self._exec_sql(statement.compile())
self.raw_sql(statement.compile())

def rename_table(
self,
Expand All @@ -516,7 +516,7 @@ def rename_table(
must_exist=not force,
)
sql = statement.compile()
self._exec_sql(sql)
self.raw_sql(sql)

def create_view(
self,
Expand Down Expand Up @@ -602,7 +602,7 @@ def create_view(
temporary=temp,
)
sql = statement.compile()
self._exec_sql(sql)
self.raw_sql(sql)

else:
raise exc.IbisError(f"Unsupported `obj` type: {type(obj)}")
Expand Down Expand Up @@ -643,7 +643,7 @@ def drop_view(
temporary=temp,
)
sql = statement.compile()
self._exec_sql(sql)
self.raw_sql(sql)

def _read_file(
self,
Expand Down Expand Up @@ -833,7 +833,7 @@ def insert(
catalog=catalog,
overwrite=overwrite,
)
return self._exec_sql(statement.compile())
return self.raw_sql(statement.compile())

if isinstance(obj, pa.Table):
obj = obj.to_pandas()
Expand Down

0 comments on commit 0b66b94

Please sign in to comment.