Skip to content

Commit

Permalink
ci: load parquet instead of csv in druid
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud authored and kszucs committed Mar 6, 2023
1 parent 885faa5 commit 2430f52
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
18 changes: 9 additions & 9 deletions ci/schema/druid.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ OVERWRITE ALL
SELECT *
FROM TABLE(
EXTERN(
'{"type":"local","files":["/opt/shared/diamonds.csv"]}',
'{"type":"csv", "findColumnsFromHeader":true}',
'{"type":"local","files":["/opt/shared/diamonds.parquet"]}',
'{"type":"parquet"}',
'[{"name":"carat","type":"double"},{"name":"cut","type":"string"},{"name":"color","type":"string"},{"name":"clarity","type":"string"},{"name":"depth","type":"double"},{"name":"table","type":"double"},{"name":"price","type":"long"},{"name":"x","type":"double"},{"name":"y","type":"double"},{"name":"z","type":"double"}]'
)
)
Expand All @@ -15,8 +15,8 @@ OVERWRITE ALL
SELECT *
FROM TABLE(
EXTERN(
'{"type":"local","files":["/opt/shared/batting.csv"]}',
'{"type":"csv", "findColumnsFromHeader":true}',
'{"type":"local","files":["/opt/shared/batting.parquet"]}',
'{"type":"parquet"}',
'[{"name":"playerID","type":"string"},{"name":"yearID","type":"long"},{"name":"stint","type":"long"},{"name":"teamID","type":"string"},{"name":"lgID","type":"string"},{"name":"G","type":"long"},{"name":"AB","type":"long"},{"name":"R","type":"long"},{"name":"H","type":"long"},{"name":"X2B","type":"long"},{"name":"X3B","type":"long"},{"name":"HR","type":"long"},{"name":"RBI","type":"long"},{"name":"SB","type":"long"},{"name":"CS","type":"long"},{"name":"BB","type":"long"},{"name":"SO","type":"long"},{"name":"IBB","type":"long"},{"name":"HBP","type":"long"},{"name":"SH","type":"long"},{"name":"SF","type":"long"},{"name":"GIDP","type":"long"}]'
)
)
Expand All @@ -27,20 +27,20 @@ OVERWRITE ALL
SELECT *
FROM TABLE(
EXTERN(
'{"type":"local","files":["/opt/shared/awards_players.csv"]}',
'{"type":"csv", "findColumnsFromHeader":true}',
'{"type":"local","files":["/opt/shared/awards_players.parquet"]}',
'{"type":"parquet"}',
'[{"name":"playerID","type":"string"},{"name":"awardID","type":"string"},{"name":"yearID","type":"long"},{"name":"lgID","type":"string"},{"name":"tie","type":"string"},{"name":"notes","type":"string"}]'
)
)
PARTITIONED BY ALL TIME;

REPLACE INTO "functional_alltypes"
OVERWRITE ALL
SELECT TIME_PARSE(timestamp_col) AS __time, *
SELECT *
FROM TABLE(
EXTERN(
'{"type":"local","files":["/opt/shared/functional_alltypes.csv"]}',
'{"type":"csv", "findColumnsFromHeader":true}',
'{"type":"local","files":["/opt/shared/functional_alltypes.parquet"]}',
'{"type":"parquet"}',
'[{"name":"index","type":"long"},{"name":"Unnamed: 0","type":"long"},{"name":"id","type":"long"},{"name":"bool_col","type":"long"},{"name":"tinyint_col","type":"long"},{"name":"smallint_col","type":"long"},{"name":"int_col","type":"long"},{"name":"bigint_col","type":"long"},{"name":"float_col","type":"double"},{"name":"double_col","type":"double"},{"name":"date_string_col","type":"string"},{"name":"string_col","type":"string"},{"name":"timestamp_col","type":"string"},{"name":"year","type":"long"},{"name":"month","type":"long"}]'
)
)
Expand Down
2 changes: 1 addition & 1 deletion docker/druid/environment
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DRUID_MAXDIRECTMEMORYSIZE=1g

druid_emitter_logging_logLevel=debug

druid_extensions_loadList=["postgresql-metadata-storage", "druid-multi-stage-query"]
druid_extensions_loadList=["postgresql-metadata-storage", "druid-multi-stage-query", "druid-parquet-extensions", "druid-avro-extensions"]

druid_zk_service_host=zookeeper

Expand Down
19 changes: 7 additions & 12 deletions ibis/backends/druid/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,13 @@ class TestConf(ServiceBackendTest, RoundHalfToEven):

@classmethod
def service_spec(cls, data_dir: Path):
files = [data_dir.joinpath("functional_alltypes.parquet")]
files.extend(
data_dir.joinpath("parquet", name, f"{name}.parquet")
for name in ("diamonds", "batting", "awards_players")
)
return ServiceSpec(
name="druid-coordinator",
data_volume="/opt/shared",
files=[
data_dir.joinpath(f"{name}.csv")
for name in (
"diamonds",
"batting",
"awards_players",
"functional_alltypes",
)
],
name="druid-coordinator", data_volume="/opt/shared", files=files
)

@staticmethod
Expand All @@ -138,7 +133,7 @@ def _load_data(data_dir: Path, script_dir: Path, **_: Any) -> None:
# gather executes immediately, but we need to wait for asyncio.run to
# create the event loop
async def load_data(queries):
"""Copy data into the Druid volume mount and run data loading queries."""
"""Run data loading queries."""
async with ClientSession() as session:
await asyncio.gather(*map(partial(run_query, session), queries))

Expand Down

0 comments on commit 2430f52

Please sign in to comment.