Skip to content

Commit

Permalink
feat(clickhouse): implement proper type serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed May 8, 2022
1 parent 1c51969 commit 80f4ab9
Show file tree
Hide file tree
Showing 20 changed files with 557 additions and 324 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.4"
services:
clickhouse:
image: yandex/clickhouse-server:22-alpine
image: clickhouse/clickhouse-server:22-alpine
ports:
- 8123:8123
- 9000:9000
Expand Down
26 changes: 8 additions & 18 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
import ibis.config
import ibis.expr.schema as sch
from ibis.backends.base.sql import BaseSQLBackend
from ibis.backends.clickhouse.client import (
ClickhouseDataType,
ClickhouseTable,
fully_qualified_re,
)
from ibis.backends.clickhouse.client import ClickhouseTable, fully_qualified_re
from ibis.backends.clickhouse.compiler import ClickhouseCompiler
from ibis.backends.clickhouse.datatypes import parse, serialize
from ibis.config import options

_default_compression: str | bool
Expand Down Expand Up @@ -109,12 +106,12 @@ def current_database(self):
return self.con.connection.database

def list_databases(self, like=None):
data, schema = self.raw_sql('SELECT name FROM system.databases')
data, _ = self.raw_sql('SELECT name FROM system.databases')
databases = list(data[0])
return self._filter_with_like(databases, like)

def list_tables(self, like=None, database=None):
data, schema = self.raw_sql('SHOW TABLES')
data, _ = self.raw_sql('SHOW TABLES')
databases = list(data[0])
return self._filter_with_like(databases, like)

Expand Down Expand Up @@ -152,13 +149,7 @@ def raw_sql(
'name': name,
'data': df.to_dict('records'),
'structure': list(
zip(
schema.names,
[
str(ClickhouseDataType.from_ibis(t))
for t in schema.types
],
)
zip(schema.names, map(serialize, schema.types))
),
}
)
Expand Down Expand Up @@ -216,9 +207,8 @@ def get_schema(
(column_names, types, *_), *_ = self.raw_sql(
f"DESCRIBE {qualified_name}"
)
return sch.Schema.from_tuples(
zip(column_names, map(ClickhouseDataType.parse, types))
)

return sch.Schema.from_tuples(zip(column_names, map(parse, types)))

def set_options(self, options):
self.con.set_options(options)
Expand All @@ -238,7 +228,7 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
)
[plan] = json.loads(raw_plans)
fields = [
(field["Name"], ClickhouseDataType.parse(field["Type"]))
(field["Name"], parse(field["Type"]))
for field in plan["Plan"]["Header"]
]
return sch.Schema.from_tuples(fields)
Expand Down
140 changes: 12 additions & 128 deletions ibis/backends/clickhouse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,142 +4,22 @@
import numpy as np
import pandas as pd

import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.types as ir

fully_qualified_re = re.compile(r"(.*)\.(?:`(.*)`|(.*))")
base_typename_re = re.compile(r"(\w+)")


_clickhouse_dtypes = {
'Null': dt.Null,
'Nothing': dt.Null,
'UInt8': dt.UInt8,
'UInt16': dt.UInt16,
'UInt32': dt.UInt32,
'UInt64': dt.UInt64,
'Int8': dt.Int8,
'Int16': dt.Int16,
'Int32': dt.Int32,
'Int64': dt.Int64,
'Float32': dt.Float32,
'Float64': dt.Float64,
'String': dt.String,
'FixedString': dt.String,
'Date': dt.Date,
'DateTime': dt.Timestamp,
'DateTime64': dt.Timestamp,
'Array': dt.Array,
}
_ibis_dtypes = {v: k for k, v in _clickhouse_dtypes.items()}
_ibis_dtypes[dt.String] = 'String'
_ibis_dtypes[dt.Timestamp] = 'DateTime'


class ClickhouseDataType:

__slots__ = 'typename', 'base_typename', 'nullable'

def __init__(self, typename, nullable=False):
m = base_typename_re.match(typename)
self.base_typename = m.groups()[0]
if self.base_typename not in _clickhouse_dtypes:
raise com.UnsupportedBackendType(typename)
self.typename = self.base_typename
self.nullable = nullable

if self.base_typename == 'Array':
self.typename = typename

def __str__(self):
if self.nullable:
return f'Nullable({self.typename})'
else:
return self.typename

def __repr__(self):
return f'<Clickhouse {str(self)}>'

@classmethod
def parse(cls, spec):
# TODO(kszucs): spare parsing, depends on clickhouse-driver#22
if spec.startswith('Nullable'):
return cls(spec[9:-1], nullable=True)
else:
return cls(spec)

def to_ibis(self):
if self.base_typename != 'Array':
return _clickhouse_dtypes[self.typename](nullable=self.nullable)

sub_type = ClickhouseDataType(
self.get_subname(self.typename)
).to_ibis()
return dt.Array(value_type=sub_type)

@staticmethod
def get_subname(name: str) -> str:
lbracket_pos = name.find('(')
rbracket_pos = name.rfind(')')

if lbracket_pos == -1 or rbracket_pos == -1:
return ''

subname = name[lbracket_pos + 1 : rbracket_pos]
return subname

@staticmethod
def get_typename_from_ibis_dtype(dtype):
if not isinstance(dtype, dt.Array):
return _ibis_dtypes[type(dtype)]

return 'Array({})'.format(
ClickhouseDataType.get_typename_from_ibis_dtype(dtype.value_type)
)

@classmethod
def from_ibis(cls, dtype, nullable=None):
typename = ClickhouseDataType.get_typename_from_ibis_dtype(dtype)
if nullable is None:
nullable = dtype.nullable
return cls(typename, nullable=nullable)


@dt.dtype.register(ClickhouseDataType)
def clickhouse_to_ibis_dtype(clickhouse_dtype):
return clickhouse_dtype.to_ibis()


class ClickhouseTable(ir.TableExpr):
"""References a physical table in Clickhouse"""

@property
def _qualified_name(self):
return self.op().args[0]

@property
def _unqualified_name(self):
return self._match_name()[1]
return self.op().name

@property
def _client(self):
return self.op().args[2]

def _match_name(self):
m = fully_qualified_re.match(self._qualified_name)
if not m:
raise com.IbisError(
'Cannot determine database name from {}'.format(
self._qualified_name
)
)
db, quoted, unquoted = m.groups()
return db, quoted or unquoted

@property
def _database(self):
return self._match_name()[0]
return self.op().source

def invalidate_metadata(self):
self._client.invalidate_metadata(self._qualified_name)
Expand Down Expand Up @@ -168,10 +48,8 @@ def insert(self, obj, **kwargs):
assert isinstance(obj, pd.DataFrame)
assert set(schema.names) >= set(obj.columns)

columns = ', '.join(map(quote_identifier, obj.columns))
query = 'INSERT INTO {table} ({columns}) VALUES'.format(
table=self._qualified_name, columns=columns
)
columns = ", ".join(map(quote_identifier, obj.columns))
query = f"INSERT INTO {self._qualified_name} ({columns}) VALUES"

# convert data columns with datetime64 pandas dtype to native date
# because clickhouse-driver 0.0.10 does arithmetic operations on it
Expand All @@ -180,5 +58,11 @@ def insert(self, obj, **kwargs):
if isinstance(schema[col], dt.Date):
obj[col] = obj[col].dt.date

data = obj.to_dict('records')
return self._client.con.execute(query, data, **kwargs)
settings = kwargs.pop("settings", {})
settings["use_numpy"] = True
return self._client.con.insert_dataframe(
query,
obj,
settings=settings,
**kwargs,
)
Loading

0 comments on commit 80f4ab9

Please sign in to comment.