Skip to content

Commit

Permalink
feat(bigquery): implement create and drop table method
Browse files Browse the repository at this point in the history
  • Loading branch information
krzysztof-kwitt authored and cpcloud committed Jan 25, 2023
1 parent e9c1579 commit 5f3c22c
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 5 deletions.
28 changes: 27 additions & 1 deletion ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import google.auth.credentials
import google.cloud.bigquery as bq
import pandas as pd
import pydata_google_auth
from google.api_core.exceptions import NotFound
from pydata_google_auth import cache
Expand All @@ -24,6 +25,7 @@
BigQueryTable,
bigquery_field_to_ibis_dtype,
bigquery_param,
ibis_schema_to_bigquery_schema,
parse_project_and_dataset,
rename_partitioned_column,
)
Expand Down Expand Up @@ -435,6 +437,31 @@ def set_database(self, name):
def version(self):
return bq.__version__

def create_table(
self,
name: str,
obj: pd.DataFrame | ir.Table | None = None,
schema: ibis.Schema | None = None,
database: str | None = None,
) -> None:
if obj is not None:
raise NotImplementedError(
"Parameter obj is not supported for create_table method in BigQuery backend"
)
if schema is None:
raise ValueError("Schema is required")

table_id = self._fully_qualified_name(name, database)
bigquery_schema = ibis_schema_to_bigquery_schema(schema)
table = bq.Table(table_id, schema=bigquery_schema)
self.client.create_table(table)

def drop_table(
self, name: str, database: str | None = None, force: bool = False
) -> None:
table_id = self._fully_qualified_name(name, database)
self.client.delete_table(table_id, not_found_ok=not force)


def compile(expr, params=None, **kwargs):
"""Compile an expression for BigQuery."""
Expand Down Expand Up @@ -514,7 +541,6 @@ def connect(


__all__ = [
"__version__",
"Backend",
"compile",
"connect",
Expand Down
17 changes: 17 additions & 0 deletions ibis/backends/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ def bigquery_schema(table):
return sch.schema(fields)


def ibis_schema_to_bigquery_schema(schema: sch.Schema):
return [
(
bq.SchemaField(
name,
ibis_type_to_bigquery_type(type_),
mode='NULLABLE' if type_.nullable else 'REQUIRED',
)
if not type_.is_array()
else bq.SchemaField(
name, ibis_type_to_bigquery_type(type_.value_type), mode='REPEATED'
)
)
for name, type_ in schema.items()
]


class BigQueryCursor:
"""BigQuery cursor.
Expand Down
20 changes: 16 additions & 4 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,26 @@ def test_sql(backend, con):
assert len(result) == 10


@mark.notimpl(["bigquery", "clickhouse", "datafusion", "polars"])
backend_type_mapping = {
"bigquery": {
# backend only implements int64
dt.int32: dt.int64
}
}


@mark.notimpl(["clickhouse", "datafusion", "polars"])
def test_create_table_from_schema(con, new_schema, temp_table):
con.create_table(temp_table, schema=new_schema)

t = con.table(temp_table)
new_table = con.table(temp_table)
backend_mapping = backend_type_mapping.get(con.name, dict())

for k, i_type in t.schema().items():
assert new_schema[k] == i_type
for column_name, column_type in new_table.schema().items():
assert (
backend_mapping.get(new_schema[column_name], new_schema[column_name])
== column_type
)


@mark.notimpl(
Expand Down

0 comments on commit 5f3c22c

Please sign in to comment.