Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let jdbc-based connectors control how data should be written and predicates pushed down #109

Merged
merged 7 commits into from
Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,20 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.prestosql.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.prestosql.plugin.jdbc.StandardReadMappings.jdbcTypeToPrestoType;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.charWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.dateWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.doubleWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.jdbcTypeToPrestoType;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.BigintType.BIGINT;
Expand All @@ -77,16 +90,16 @@ public class BaseJdbcClient
{
private static final Logger log = Logger.get(BaseJdbcClient.class);

private static final Map<Type, String> SQL_TYPES = ImmutableMap.<Type, String>builder()
.put(BOOLEAN, "boolean")
.put(BIGINT, "bigint")
.put(INTEGER, "integer")
.put(SMALLINT, "smallint")
.put(TINYINT, "tinyint")
.put(DOUBLE, "double precision")
.put(REAL, "real")
.put(VARBINARY, "varbinary")
.put(DATE, "date")
private static final Map<Type, WriteMapping> WRITE_MAPPINGS = ImmutableMap.<Type, WriteMapping>builder()
.put(BOOLEAN, WriteMapping.booleanMapping("boolean", booleanWriteFunction()))
electrum marked this conversation as resolved.
Show resolved Hide resolved
.put(BIGINT, WriteMapping.longMapping("bigint", bigintWriteFunction()))
.put(INTEGER, WriteMapping.longMapping("integer", integerWriteFunction()))
.put(SMALLINT, WriteMapping.longMapping("smallint", smallintWriteFunction()))
.put(TINYINT, WriteMapping.longMapping("tinyint", tinyintWriteFunction()))
.put(DOUBLE, WriteMapping.doubleMapping("double precision", doubleWriteFunction()))
.put(REAL, WriteMapping.longMapping("real", realWriteFunction()))
.put(VARBINARY, WriteMapping.sliceMapping("varbinary", varbinaryWriteFunction()))
.put(DATE, WriteMapping.longMapping("date", dateWriteFunction()))
.build();

protected final String connectorId;
Expand Down Expand Up @@ -197,7 +210,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Optional<ColumnMapping> columnMapping = toPrestoType(session, typeHandle);
// skip unsupported column types
if (columnMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
Expand All @@ -217,7 +230,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
}

@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
public Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
return jdbcTypeToPrestoType(typeHandle);
}
Expand Down Expand Up @@ -252,11 +265,12 @@ public Connection getConnection(JdbcSplit split)
}

@Override
public PreparedStatement buildSql(Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
throws SQLException
{
return new QueryBuilder(identifierQuote).buildSql(
this,
session,
connection,
split.getCatalogName(),
split.getSchemaName(),
Expand Down Expand Up @@ -308,7 +322,8 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorTableMetadata tableMetada
}
columnNames.add(columnName);
columnTypes.add(column.getType());
columnList.add(format("%s %s", quoted(columnName), toSqlType(column.getType())));
// TODO in INSERT case, we should reuse original column type and, ideally, constraints (then JdbcPageSink must get writer from toPrestoType())
columnList.add(format("%s %s", quoted(columnName), toWriteMapping(column.getType()).getDataType()));
}

String sql = format(
Expand Down Expand Up @@ -457,28 +472,43 @@ protected void execute(Connection connection, String query)
}
}

protected String toSqlType(Type type)
@Override
public WriteMapping toWriteMapping(Type type)
{
if (isVarcharType(type)) {
VarcharType varcharType = (VarcharType) type;
String dataType;
if (varcharType.isUnbounded()) {
return "varchar";
dataType = "varchar";
}
else {
dataType = "varchar(" + varcharType.getBoundedLength() + ")";
}
return "varchar(" + varcharType.getBoundedLength() + ")";
return WriteMapping.sliceMapping(dataType, varcharWriteFunction());
}
if (type instanceof CharType) {
if (((CharType) type).getLength() == CharType.MAX_LENGTH) {
return "char";
CharType charType = (CharType) type;
String dataType;
if (charType.getLength() == CharType.MAX_LENGTH) {
dataType = "char";
}
return "char(" + ((CharType) type).getLength() + ")";
else {
dataType = "char(" + charType.getLength() + ")";
}
return WriteMapping.sliceMapping(dataType, charWriteFunction(charType));
}
if (type instanceof DecimalType) {
return format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
DecimalType decimalType = (DecimalType) type;
String dataType = format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale());
if (decimalType.isShort()) {
return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType));
}
return WriteMapping.sliceMapping(dataType, longDecimalWriteFunction(decimalType));
}

String sqlType = SQL_TYPES.get(type);
if (sqlType != null) {
return sqlType;
WriteMapping writeMapping = WRITE_MAPPINGS.get(type);
if (writeMapping != null) {
return writeMapping;
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public interface BooleanWriteFunction
findepi marked this conversation as resolved.
Show resolved Hide resolved
extends WriteFunction
{
@Override
default Class<?> getJavaType()
{
return boolean.class;
}

void set(PreparedStatement statement, int index, boolean value)
throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.jdbc;

import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.type.Type;

import java.util.function.UnaryOperator;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public final class ColumnMapping
{
public static ColumnMapping booleanMapping(Type prestoType, BooleanReadFunction readFunction, BooleanWriteFunction writeFunction)
{
return booleanMapping(prestoType, readFunction, writeFunction, UnaryOperator.identity());
}

public static ColumnMapping booleanMapping(Type prestoType, BooleanReadFunction readFunction, BooleanWriteFunction writeFunction, UnaryOperator<Domain> pushdownConverter)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, pushdownConverter);
}

public static ColumnMapping longMapping(Type prestoType, LongReadFunction readFunction, LongWriteFunction writeFunction)
{
return longMapping(prestoType, readFunction, writeFunction, UnaryOperator.identity());
}

public static ColumnMapping longMapping(Type prestoType, LongReadFunction readFunction, LongWriteFunction writeFunction, UnaryOperator<Domain> pushdownConverter)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, pushdownConverter);
}

public static ColumnMapping doubleMapping(Type prestoType, DoubleReadFunction readFunction, DoubleWriteFunction writeFunction)
{
return doubleMapping(prestoType, readFunction, writeFunction, UnaryOperator.identity());
}

public static ColumnMapping doubleMapping(Type prestoType, DoubleReadFunction readFunction, DoubleWriteFunction writeFunction, UnaryOperator<Domain> pushdownConverter)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, pushdownConverter);
}

public static ColumnMapping sliceMapping(Type prestoType, SliceReadFunction readFunction, SliceWriteFunction writeFunction)
{
return sliceMapping(prestoType, readFunction, writeFunction, UnaryOperator.identity());
}

public static ColumnMapping sliceMapping(Type prestoType, SliceReadFunction readFunction, SliceWriteFunction writeFunction, UnaryOperator<Domain> pushdownConverter)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, pushdownConverter);
}

private final Type type;
private final ReadFunction readFunction;
private final WriteFunction writeFunction;
private final UnaryOperator<Domain> pushdownConverter;

private ColumnMapping(Type type, ReadFunction readFunction, WriteFunction writeFunction, UnaryOperator<Domain> pushdownConverter)
{
this.type = requireNonNull(type, "type is null");
this.readFunction = requireNonNull(readFunction, "readFunction is null");
this.writeFunction = requireNonNull(writeFunction, "writeFunction is null");
checkArgument(
type.getJavaType() == readFunction.getJavaType(),
"Presto type %s is not compatible with read function %s returning %s",
type,
readFunction,
readFunction.getJavaType());
checkArgument(
type.getJavaType() == writeFunction.getJavaType(),
"Presto type %s is not compatible with write function %s accepting %s",
type,
writeFunction,
writeFunction.getJavaType());
this.pushdownConverter = requireNonNull(pushdownConverter, "pushdownConverter is null");
}

public Type getType()
{
return type;
}

public ReadFunction getReadFunction()
{
return readFunction;
}

public WriteFunction getWriteFunction()
{
return writeFunction;
}

public UnaryOperator<Domain> getPushdownConverter()
{
return pushdownConverter;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("type", type)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public interface DoubleWriteFunction
extends WriteFunction
{
@Override
default Class<?> getJavaType()
{
return double.class;
}

void set(PreparedStatement statement, int index, double value)
throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.statistics.TableStatistics;
import io.prestosql.spi.type.Type;

import javax.annotation.Nullable;

Expand All @@ -46,7 +47,9 @@ default boolean schemaExists(String schema)

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);
Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);

WriteMapping toWriteMapping(Type type);

ConnectorSplitSource getSplits(JdbcTableLayoutHandle layoutHandle);

Expand All @@ -59,7 +62,7 @@ default void abortReadConnection(Connection connection)
// most drivers do not need this
}

PreparedStatement buildSql(Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
throws SQLException;

JdbcOutputTableHandle beginCreateTable(ConnectorTableMetadata tableMetadata);
Expand Down
Loading