Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let jdbc-based connectors control how data should be written and predicates pushed down #12151

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,24 @@
import java.util.UUID;

import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.jdbc.StandardReadMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.charWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.dateWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.doubleWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static com.facebook.presto.plugin.jdbc.WriteMapping.booleanWriteMapping;
import static com.facebook.presto.plugin.jdbc.WriteMapping.doubleWriteMapping;
import static com.facebook.presto.plugin.jdbc.WriteMapping.longWriteMapping;
import static com.facebook.presto.plugin.jdbc.WriteMapping.sliceWriteMapping;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
Expand All @@ -58,10 +75,6 @@
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.RealType.REAL;
import static com.facebook.presto.spi.type.SmallintType.SMALLINT;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.spi.type.TinyintType.TINYINT;
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.spi.type.Varchars.isVarcharType;
Expand All @@ -78,20 +91,16 @@ public class BaseJdbcClient
{
private static final Logger log = Logger.get(BaseJdbcClient.class);

private static final Map<Type, String> SQL_TYPES = ImmutableMap.<Type, String>builder()
.put(BOOLEAN, "boolean")
.put(BIGINT, "bigint")
.put(INTEGER, "integer")
.put(SMALLINT, "smallint")
.put(TINYINT, "tinyint")
.put(DOUBLE, "double precision")
.put(REAL, "real")
.put(VARBINARY, "varbinary")
.put(DATE, "date")
.put(TIME, "time")
.put(TIME_WITH_TIME_ZONE, "time with timezone")
.put(TIMESTAMP, "timestamp")
.put(TIMESTAMP_WITH_TIME_ZONE, "timestamp with timezone")
private static final Map<Type, WriteMapping> WRITE_MAPPINGS = ImmutableMap.<Type, WriteMapping>builder()
.put(BOOLEAN, booleanWriteMapping("boolean", booleanWriteFunction()))
.put(BIGINT, longWriteMapping("bigint", bigintWriteFunction()))
.put(INTEGER, longWriteMapping("integer", integerWriteFunction()))
.put(SMALLINT, longWriteMapping("smallint", smallintWriteFunction()))
.put(TINYINT, longWriteMapping("tinyint", tinyintWriteFunction()))
.put(DOUBLE, doubleWriteMapping("double precision", doubleWriteFunction()))
.put(REAL, longWriteMapping("real", realWriteFunction()))
.put(VARBINARY, sliceWriteMapping("varbinary", varbinaryWriteFunction()))
.put(DATE, longWriteMapping("date", dateWriteFunction()))
.build();

protected final String connectorId;
Expand Down Expand Up @@ -199,9 +208,10 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
while (resultSet.next()) {
JdbcTypeHandle typeHandle = new JdbcTypeHandle(
resultSet.getInt("DATA_TYPE"),
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Optional<ColumnMapping> columnMapping = toPrestoType(session, typeHandle);
// skip unsupported column types
if (columnMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
Expand All @@ -221,7 +231,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
}

@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
public Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
return jdbcTypeToPrestoType(typeHandle);
}
Expand Down Expand Up @@ -255,11 +265,12 @@ public Connection getConnection(JdbcSplit split)
}

@Override
public PreparedStatement buildSql(Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
throws SQLException
{
return new QueryBuilder(identifierQuote).buildSql(
this,
session,
connection,
split.getCatalogName(),
split.getSchemaName(),
Expand Down Expand Up @@ -316,7 +327,8 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorTableMetadata tableMetada
columnList.add(new StringBuilder()
.append(quoted(columnName))
.append(" ")
.append(toSqlType(column.getType()))
// TODO in INSERT case, we should reuse original column type and, ideally, constraints (then JdbcPageSink must get writer from toPrestoType())
.append(toWriteMapping(column.getType()).getDataType())
.toString());
}
Joiner.on(", ").appendTo(sql, columnList.build());
Expand Down Expand Up @@ -463,28 +475,53 @@ protected void execute(Connection connection, String query)
}
}

protected String toSqlType(Type type)
/**
* @deprecated Use {@link #toWriteMapping(Type)}.
*/
@Deprecated
protected final String toSqlType(Type type)
{
// TODO remove this method when all connectors updated
return toWriteMapping(type).getDataType();
}

@Override
public WriteMapping toWriteMapping(Type type)
{
if (isVarcharType(type)) {
VarcharType varcharType = (VarcharType) type;
String dataType;
if (varcharType.isUnbounded()) {
return "varchar";
dataType = "varchar";
}
return "varchar(" + varcharType.getLengthSafe() + ")";
else {
dataType = "varchar(" + varcharType.getLengthSafe() + ")";
}
return WriteMapping.sliceWriteMapping(dataType, varcharWriteFunction());
}
if (type instanceof CharType) {
if (((CharType) type).getLength() == CharType.MAX_LENGTH) {
return "char";
CharType charType = (CharType) type;
String dataType;
if (charType.getLength() == CharType.MAX_LENGTH) {
dataType = "char";
}
else {
dataType = "char(" + charType.getLength() + ")";
}
return "char(" + ((CharType) type).getLength() + ")";
return WriteMapping.sliceWriteMapping(dataType, charWriteFunction(charType));
}
if (type instanceof DecimalType) {
return format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
DecimalType decimalType = (DecimalType) type;
String dataType = format("decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale());
if (decimalType.isShort()) {
return longWriteMapping(dataType, shortDecimalWriteFunction(decimalType));
}
return WriteMapping.sliceWriteMapping(dataType, longDecimalWriteFunction(decimalType));
}

String sqlType = SQL_TYPES.get(type);
if (sqlType != null) {
return sqlType;
WriteMapping writeMapping = WRITE_MAPPINGS.get(type);
if (writeMapping != null) {
return writeMapping;
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public interface BooleanWriteFunction
extends WriteFunction
{
@Override
default Class<?> getJavaType()
{
return boolean.class;
}

void set(PreparedStatement statement, int index, boolean value)
throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.spi.type.Type;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public final class ColumnMapping
{
public static ColumnMapping booleanMapping(Type prestoType, BooleanReadFunction readFunction, BooleanWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, true);
}

public static ColumnMapping longMapping(Type prestoType, LongReadFunction readFunction, LongWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, true);
}

public static ColumnMapping doubleMapping(Type prestoType, DoubleReadFunction readFunction, DoubleWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, true);
}

public static ColumnMapping sliceMapping(Type prestoType, SliceReadFunction readFunction, SliceWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction, true);
}

private final Type type;
private final ReadFunction readFunction;
private final WriteFunction writeFunction;
private final boolean predicatePushDownAllowed;

private ColumnMapping(Type type, ReadFunction readFunction, WriteFunction writeFunction, boolean predicatePushDownAllowed)
{
this.type = requireNonNull(type, "type is null");
this.readFunction = requireNonNull(readFunction, "readFunction is null");
this.writeFunction = requireNonNull(writeFunction, "writeFunction is null");
checkArgument(
type.getJavaType() == readFunction.getJavaType(),
"Presto type %s is not compatible with read function %s returning %s",
type,
readFunction,
readFunction.getJavaType());
checkArgument(
type.getJavaType() == writeFunction.getJavaType(),
"Presto type %s is not compatible with write function %s accepting %s",
type,
writeFunction,
writeFunction.getJavaType());
this.predicatePushDownAllowed = predicatePushDownAllowed;
}

public Type getType()
{
return type;
}

public ReadFunction getReadFunction()
{
return readFunction;
}

public WriteFunction getWriteFunction()
{
return writeFunction;
}

public boolean isPredicatePushDownAllowed()
{
return predicatePushDownAllowed;
}

public ColumnMapping withPredicatePushDownAllowed(boolean predicatePushDownAllowed)
{
return new ColumnMapping(type, readFunction, writeFunction, predicatePushDownAllowed);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("type", type)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public interface DoubleWriteFunction
extends WriteFunction
{
@Override
default Class<?> getJavaType()
{
return double.class;
}

void set(PreparedStatement statement, int index, double value)
throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.type.Type;

import javax.annotation.Nullable;

Expand All @@ -43,7 +44,9 @@ default boolean schemaExists(String schema)

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);
Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);

WriteMapping toWriteMapping(Type type);

ConnectorSplitSource getSplits(JdbcTableLayoutHandle layoutHandle);

Expand All @@ -56,7 +59,7 @@ default void abortReadConnection(Connection connection)
// most drivers do not need this
}

PreparedStatement buildSql(Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
throws SQLException;

JdbcOutputTableHandle beginCreateTable(ConnectorTableMetadata tableMetadata);
Expand Down
Loading