Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Druid __time filter is not pushed down #8404

Closed
jerryleooo opened this issue Jun 28, 2021 · 21 comments
Closed

Druid __time filter is not pushed down #8404

jerryleooo opened this issue Jun 28, 2021 · 21 comments
Assignees
Labels
enhancement New feature or request

Comments

@jerryleooo
Copy link
Member

jerryleooo commented Jun 28, 2021

In Druid queries, __time is a very important filter since applying it narrows down the scan range, otherwise, Druid will do a full table scan, which is very slow and resource-consuming.

Currently, I found the __time is not pushed down, not sure if I use it wrongly or the pushdown is not supported yet:

Trino SQL:
select sum(col1) from <table name> where __time between date'2021-06-01' and date'2021-06-30';

SQLs sent to Druid:
SELECT "__time", "col1" FROM "druid"."<table name>"

I noticed there are ongoing works (#4109 or #4313) of implementing Druid aggregation pushdown, not sure if this is part work of it

@jerryleooo
Copy link
Member Author

jerryleooo commented Jun 28, 2021

The result of "explain":

explain select sum(col1) from <table name> where __time between timestamp'2021-06-01' and timestamp'2021-06-30';

Fragment 0 [SINGLE]
    Output layout: [sum]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Output[_col0]
    │   Layout: [sum:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    │   _col0 := sum
    └─ Aggregate(FINAL)
       │   Layout: [sum:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
       │   sum := sum("sum_0")
       └─ LocalExchange[SINGLE] ()
          │   Layout: [sum_0:row(bigint, boolean, bigint, boolean)]
          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
          └─ RemoteSource[1]
                 Layout: [sum_0:row(bigint, boolean, bigint, boolean)]

Fragment 1 [SOURCE]
    Output layout: [sum_0]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Aggregate(PARTIAL)
    │   Layout: [sum_0:row(bigint, boolean, bigint, boolean)]
    │   sum_0 := sum("col1")
    └─ ScanFilterProject[table = druid:druid.<table name> druid.druid.<table name> columns=[__time:timestamp(3):TIMESTAMP, col1:bigint:BIGINT], grouped = false, filterPredicate = ("__time" BETWEEN TIMESTAMP '2021-06-01 00:00:00.000' AND TIMESTAMP '2021-06-30 00:00:00.000')]
           Layout: [col1:bigint]
           Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           __time := __time:timestamp(3):TIMESTAMP
           col1 := col1:bigint:BIGINT

seems the filter is pushed down, but from what we observed, the scan rows are still full table, not sure if there is issue in buildSql

@sumannewton
Copy link
Contributor

Looks like only varchar types are handled in the druid connector.

@Override
public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
{
switch (typeHandle.getJdbcType()) {
case Types.VARCHAR:
int columnSize = typeHandle.getRequiredColumnSize();
if (columnSize == -1) {
return Optional.of(varcharColumnMapping(createUnboundedVarcharType(), true));
}
return Optional.of(defaultVarcharColumnMapping(columnSize, true));
}
// TODO implement proper type mapping
return legacyColumnMapping(session, connection, typeHandle);
}

I am not an expert in Druid. @findepi @martint Please take a look.

@sumannewton sumannewton added enhancement New feature or request jdbc Relates to Trino JDBC driver labels Jul 2, 2021
@hashhar
Copy link
Member

hashhar commented Jul 2, 2021

@sumannewton Correct.

There are no explicit type mappings defined for the write path. toWriteMapping so no pushdown is possible. There's a reason why legacyToWriteMapping and legacyColumnMapping are deprecated.

There should be type mappings defined explicitly for data-types in both the read (toColumnMapping) and write (toWriteMapping) paths.

@hashhar hashhar removed the jdbc Relates to Trino JDBC driver label Jul 2, 2021
@jerryleooo
Copy link
Member Author

Hi @hashhar so is there any plan to improve this part?

@hashhar
Copy link
Member

hashhar commented Jul 4, 2021

@jerryleooo i don't know of anyone actively working on this at the moment but we can guide anyone who wants to work on this through the changes.

Looking at the mentioned methods in the PostgreSQL connector should be all that's needed.

cc: @dheerajkulakarni thought you might be interested.

@jerryleooo
Copy link
Member Author

@hashhar understand and we have the interest to improve this part, will see the methods you mentioned. tks

@jerryleooo
Copy link
Member Author

Hi @hashhar @sumannewton I am trying to fix this as you guided: #8474
But there is some error. In

return (resultSet, columnIndex) -> toTrinoTimestamp(timestampType, resultSet.getObject(columnIndex, LocalDateTime.class));
I found AvaticaResultSet will raise error when getObject(columnIndex, LocalDateTime.class), I paste the full error log here:

2021-07-06T02:28:34.895-0500 INFO TIMELINE: Query 20210706_072834_00243_h68ar :: Transaction:[4f5bdbb7-41c5-4b56-aa2c-47fe5593812c] :: elapsed 157ms :: planning 8ms :: waiting 103ms :: scheduling 105ms :: running 42ms :: finishing 2ms :: begin 2021-07-06T02:28:34.735-05:00 :: end 2021-07-06T02:28:34.892-05:00
2021-07-06T02:28:35.071-0500 INFO SELECT "__time", "clerk", "comment", "custkey", "orderdate", "orderkey", "orderpriority", "orderstatus", "shippriority", "totalprice" FROM "druid"."orders"
2021-07-06T02:28:35.274-0500 SEVERE Error processing Split 20210706_072834_00244_h68ar.1.0-0 io.trino.plugin.jdbc.JdbcSplit@29ef096 (start = 1.37692158049932E8, wall = 215 ms, cpu = 0 ms, wait = 0 ms, calls = 1): JDBC_ERROR: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
io.trino.spi.TrinoException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
	at io.trino.plugin.jdbc.JdbcRecordCursor.handleSqlException(JdbcRecordCursor.java:299)
	at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:199)
	at io.trino.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:106)
	at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
	at io.trino.operator.Driver.processInternal(Driver.java:387)
	at io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
	at io.trino.operator.Driver.tryWithLock(Driver.java:683)
	at io.trino.operator.Driver.processFor(Driver.java:284)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.trino.$gen.Trino_testversion____20210706_071946_3.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLDataException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
	at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.cannotConvert(AbstractCursor.java:363)
	at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.getObject(AbstractCursor.java:424)
	at org.apache.calcite.avatica.AvaticaResultSet.getObject(AvaticaResultSet.java:1049)
	at io.trino.plugin.jdbc.StandardColumnMappings.lambda$timestampReadFunction$23(StandardColumnMappings.java:499)
	at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:196)
	... 13 more

2021-07-06T02:28:35.283-0500 SEVERE Stage 20210706_072834_00244_h68ar.1 failed
io.trino.spi.TrinoException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
	at io.trino.plugin.jdbc.JdbcRecordCursor.handleSqlException(JdbcRecordCursor.java:299)
	at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:199)
	at io.trino.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:106)
	at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
	at io.trino.operator.Driver.processInternal(Driver.java:387)
	at io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
	at io.trino.operator.Driver.tryWithLock(Driver.java:683)
	at io.trino.operator.Driver.processFor(Driver.java:284)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.trino.$gen.Trino_testversion____20210706_071946_3.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLDataException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
	at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.cannotConvert(AbstractCursor.java:363)
	at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.getObject(AbstractCursor.java:424)
	at org.apache.calcite.avatica.AvaticaResultSet.getObject(AvaticaResultSet.java:1049)
	at io.trino.plugin.jdbc.StandardColumnMappings.lambda$timestampReadFunction$23(StandardColumnMappings.java:499)
	at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:196)
	... 13 more

2021-07-06T02:28:35.288-0500 INFO FlakyTestRetryAnalyzer not enabled: CONTINUOUS_INTEGRATION environment is not detected or system property 'io.trino.testng.services.FlakyTestRetryAnalyzer.enabled' is not set to 'true' (actual: <not set>)
2021-07-06T02:28:35.308-0500 INFO TIMELINE: Query 20210706_072834_00244_h68ar :: Transaction:[afbebe63-cdc4-4031-b24b-a3655037a957] :: elapsed 385ms :: planning 11ms :: waiting 95ms :: scheduling 148ms :: running 216ms :: finishing 10ms :: begin 2021-07-06T02:28:34.899-05:00 :: end 2021-07-06T02:28:35.284-05:00

java.lang.RuntimeException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)

	at io.trino.testing.AbstractTestingTrinoClient.execute(AbstractTestingTrinoClient.java:120)
	at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:476)
	at io.trino.testing.AbstractTestQueryFramework.computeActual(AbstractTestQueryFramework.java:137)
	at io.trino.testing.AbstractTestQueryFramework.assertExplainAnalyze(AbstractTestQueryFramework.java:363)
	at io.trino.testing.AbstractTestQueryFramework.assertExplainAnalyze(AbstractTestQueryFramework.java:354)
	at io.trino.testing.AbstractTestQueryFramework.assertExplainAnalyze(AbstractTestQueryFramework.java:349)
	at io.trino.testing.BaseConnectorTest.testExplainAnalyze(BaseConnectorTest.java:523)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
	at org.testng.TestRunner.privateRun(TestRunner.java:756)
	at org.testng.TestRunner.run(TestRunner.java:610)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:387)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:382)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340)
	at org.testng.SuiteRunner.run(SuiteRunner.java:289)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1293)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1218)
	at org.testng.TestNG.runSuites(TestNG.java:1133)
	at org.testng.TestNG.run(TestNG.java:1104)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:109)
	Suppressed: java.lang.Exception: SQL: EXPLAIN ANALYZE SELECT * FROM orders
		at io.trino.testing.DistributedQueryRunner.execute(DistributedQueryRunner.java:479)
		... 29 more
Caused by: io.trino.spi.TrinoException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
	at io.trino.plugin.jdbc.JdbcRecordCursor.handleSqlException(JdbcRecordCursor.java:299)
	at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:199)
	at io.trino.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:106)
	at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:301)
	at io.trino.operator.Driver.processInternal(Driver.java:387)
	at io.trino.operator.Driver.lambda$processFor$9(Driver.java:291)
	at io.trino.operator.Driver.tryWithLock(Driver.java:683)
	at io.trino.operator.Driver.processFor(Driver.java:284)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.trino.$gen.Trino_testversion____20210706_071946_3.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLDataException: cannot convert to Object (with type) (org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor@55f43d1d)
	at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.cannotConvert(AbstractCursor.java:363)
	at org.apache.calcite.avatica.util.AbstractCursor$AccessorImpl.getObject(AbstractCursor.java:424)
	at org.apache.calcite.avatica.AvaticaResultSet.getObject(AvaticaResultSet.java:1049)
	at io.trino.plugin.jdbc.StandardColumnMappings.lambda$timestampReadFunction$23(StandardColumnMappings.java:499)
	at io.trino.plugin.jdbc.JdbcRecordCursor.getLong(JdbcRecordCursor.java:196)
	... 13 more

I guess we may need to report this to Avatica or have a workaround here.

@jerryleooo
Copy link
Member Author

@hashhar
Copy link
Member

hashhar commented Jul 6, 2021

Seems https://issues.apache.org/jira/browse/CALCITE-1630 is related

Take a look at StandardColumnMappings#timestampWriteFunctionUsingSqlTimestamp and StandardColumnMappings#timeColumnMappingUsingSqlTime.
They use the java.sql types instead and hence don't work correctly during DST gaps though. So please also add a TODO in the code about https://issues.apache.org/jira/browse/CALCITE-1630

@jerryleooo
Copy link
Member Author

Thanks @hashhar, timestampColumnMappingUsingSqlTimestampWithRounding works and I will update the code later.

Since Druid doesn't support createTable and addColumn, I guess we may not need a toWriteMapping?

@hashhar
Copy link
Member

hashhar commented Jul 6, 2021

The toWriteMapping is still used to generate expressions and statement used when pushing down queries. It's not a "INSERT" mapping. It's about how to write Trino types to Druid types.

jerryleooo added a commit to jerryleooo/trino that referenced this issue Jul 6, 2021
@jerryleooo jerryleooo self-assigned this Jul 9, 2021
@findepi
Copy link
Member

findepi commented Jul 12, 2021

Lack of pushdown is because Druid column mapping was never implemented yet:

hence we end up calling the timestampColumnMappingUsingSqlTimestampWithRounding which disables pushdown for correctness reasons (because of rounding on read).

We should impl proper type mapping for Druid. See other connectors extending from BaseJdbcClient for reference.
and see TestPostgreSqlTypeMapping for the test class to follow

@jerryleooo
Copy link
Member Author

Thanks, @findepi , I am working on this as you suggested.

@jerryleooo
Copy link
Member Author

The problem is, inside TestXXXTypeMapping, we need to use DataTypeTest or SqlDataTypeTest, which rely on TestTable, in whose constructor CREATE TABLE and INSERT DATA will be run -- this is not suitable for Druid.

I guess I may need to implement DruidDataTypeTest, DruidTestTable, DruidCreateAndInsertDataSetup, but not sure if I am too far away. Any ideas? @findepi @hashhar

@jibinpt
Copy link

jibinpt commented Jul 13, 2021

            case Types.TIMESTAMP:
                TimestampType timestampType = createTimestampType(3);
                return Optional.of(timestampColumnMapping(timestampType));
                

@findepi Adding above code for type mapping similar to other JDBC clients resulted in the following error:

trino:druid> explain analyze select __time from orders where orders.__time >= date '1995-01-01';

Query 20210713_192755_00001_7ib4c, FAILED, 1 node
Splits: 18 total, 0 done (0.00%)
3.07 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20210713_192755_00001_7ib4c failed: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type `java.time.LocalDateTime` not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" to enable handling (through reference chain: org.apache.calcite.avatica.remote.Service$ExecuteRequest["parameterValues"]->java.util.Arrays$ArrayList[0]->org.apache.calcite.avatica.remote.TypedValue["value"])

@jerryleooo did you face similar issue?

@jerryleooo
Copy link
Member Author

@jibinpt can see my latest commit

@findepi
Copy link
Member

findepi commented Jul 27, 2021

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 date/time type java.time.LocalDateTime not supported by default: add Module "com.fasterxml.jackson.datatype:jackson-datatype-jsr310"

@jibinpt did you try to add that module?

@jibinpt
Copy link

jibinpt commented Jul 29, 2021

@findepi error message say (through reference chain: org.apache.calcite.avatica.remote.Service$ExecuteRequest["parameterValues"]->java.util.Arrays$ArrayList[0]->org.apache.calcite.avatica.remote.TypedValue["value"]). So won't this be happening at Druid side and including this module in Trino pom.xml won't help right?

@maudrid
Copy link

maudrid commented Nov 29, 2021

This fix is sorely needed!
I compiled the current state of this branch but I had to do a number of cosmetic fixes to get this to work.
I will fork the project and do a pull for those.

@maudrid
Copy link

maudrid commented Nov 29, 2021

Created the pull request for @jerryleooo here: jerryleooo#2

@Praveen2112
Copy link
Member

#13335 fixes this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

Successfully merging a pull request may close this issue.

7 participants