Skip to content

Commit

Permalink
extended readMapping to columnMapping for jdbc connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
pratyakshsharma committed Jul 1, 2022
1 parent c61c2e4 commit b74d9ec
Show file tree
Hide file tree
Showing 22 changed files with 1,067 additions and 514 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,21 @@
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimeType.TIME;
import static com.facebook.presto.common.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static com.facebook.presto.plugin.jdbc.StandardReadMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.dateWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.doubleWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.jdbcTypeToPrestoType;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static com.facebook.presto.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -93,20 +99,16 @@ public class BaseJdbcClient
{
private static final Logger log = Logger.get(BaseJdbcClient.class);

private static final Map<Type, String> SQL_TYPES = ImmutableMap.<Type, String>builder()
.put(BOOLEAN, "boolean")
.put(BIGINT, "bigint")
.put(INTEGER, "integer")
.put(SMALLINT, "smallint")
.put(TINYINT, "tinyint")
.put(DOUBLE, "double precision")
.put(REAL, "real")
.put(VARBINARY, "varbinary")
.put(DATE, "date")
.put(TIME, "time")
.put(TIME_WITH_TIME_ZONE, "time with timezone")
.put(TIMESTAMP, "timestamp")
.put(TIMESTAMP_WITH_TIME_ZONE, "timestamp with timezone")
private static final Map<Type, WriteMapping> WRITE_MAPPING_MAP = ImmutableMap.<Type, WriteMapping>builder()
.put(BOOLEAN, WriteMapping.booleanMapping("boolean", booleanWriteFunction()))
.put(BIGINT, WriteMapping.longMapping("bigint", bigintWriteFunction()))
.put(INTEGER, WriteMapping.longMapping("integer", integerWriteFunction()))
.put(SMALLINT, WriteMapping.longMapping("smallint", smallintWriteFunction()))
.put(TINYINT, WriteMapping.longMapping("tinyint", tinyintWriteFunction()))
.put(DOUBLE, WriteMapping.doubleMapping("double precision", doubleWriteFunction()))
.put(REAL, WriteMapping.longMapping("real", realWriteFunction()))
.put(VARBINARY, WriteMapping.sliceMapping("varbinary", varbinaryWriteFunction()))
.put(DATE, WriteMapping.longMapping("date", dateWriteFunction()))
.build();

protected final String connectorId;
Expand Down Expand Up @@ -237,7 +239,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
resultSet.getString("TYPE_NAME"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"));
Optional<ReadMapping> columnMapping = toPrestoType(session, typeHandle);
Optional<ColumnMapping> columnMapping = toPrestoType(session, typeHandle);
// skip unsupported column types
if (columnMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
Expand All @@ -259,7 +261,7 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
}

@Override
public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
public Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle)
{
return jdbcTypeToPrestoType(typeHandle);
}
Expand Down Expand Up @@ -370,6 +372,7 @@ protected JdbcOutputTableHandle createTable(ConnectorTableMetadata tableMetadata
}
columnNames.add(columnName);
columnTypes.add(column.getType());
// TODO in INSERT case, we should reuse original column type and, ideally, constraints (then JdbcPageSink must get writer from toPrestoType())
columnList.add(getColumnString(column, columnName));
}

Expand All @@ -395,7 +398,7 @@ private String getColumnString(ColumnMetadata column, String columnName)
StringBuilder sb = new StringBuilder()
.append(quoted(columnName))
.append(" ")
.append(toSqlType(column.getType()));
.append(toWriteMapping(column.getType()).getDataType());
if (!column.isNullable()) {
sb.append(" NOT NULL");
}
Expand Down Expand Up @@ -729,28 +732,41 @@ protected void execute(Connection connection, String query)
}
}

protected String toSqlType(Type type)
public WriteMapping toWriteMapping(Type type)
{
String dataType;
if (isVarcharType(type)) {
VarcharType varcharType = (VarcharType) type;
if (varcharType.isUnbounded()) {
return "varchar";
dataType = "varchar";
}
return "varchar(" + varcharType.getLengthSafe() + ")";
else {
dataType = "varchar(" + varcharType.getLengthSafe() + ")";
}
return WriteMapping.sliceMapping(dataType, varcharWriteFunction());
}
if (type instanceof CharType) {
if (((CharType) type).getLength() == CharType.MAX_LENGTH) {
return "char";
CharType charType = (CharType) type;
if (charType.getLength() == CharType.MAX_LENGTH) {
dataType = "char";
}
else {
dataType = "char(" + ((CharType) type).getLength() + ")";
}
return "char(" + ((CharType) type).getLength() + ")";
return WriteMapping.sliceMapping(dataType, StandardColumnMappings.charWriteFunction(charType));
}
if (type instanceof DecimalType) {
return format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
DecimalType decimalType = (DecimalType) type;
dataType = format("decimal(%s, %s)", ((DecimalType) type).getPrecision(), ((DecimalType) type).getScale());
if (decimalType.isShort()) {
return WriteMapping.longMapping(dataType, StandardColumnMappings.shortDecimalWriteFunction(decimalType));
}
return WriteMapping.sliceMapping(dataType, StandardColumnMappings.longDecimalWriteFunction(decimalType));
}

String sqlType = SQL_TYPES.get(type);
if (sqlType != null) {
return sqlType;
WriteMapping writeMapping = WRITE_MAPPING_MAP.get(type);
if (writeMapping != null) {
return writeMapping;
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public interface BooleanWriteFunction
extends WriteFunction
{
default Class<?> getJavaType()
{
return boolean.class;
}

void set(PreparedStatement statement, int index, boolean value) throws SQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.common.type.Type;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class ColumnMapping
{
private final Type type;
private final ReadFunction readFunction;
private final WriteFunction writeFunction;

private ColumnMapping(Type type, ReadFunction readFunction, WriteFunction writeFunction)
{
this.type = requireNonNull(type, "type is null");
this.readFunction = requireNonNull(readFunction, "readFunction is null");
this.writeFunction = requireNonNull(writeFunction, "writeFunction is null");
checkArgument(
type.getJavaType() == readFunction.getJavaType(),
"Presto type %s is not compatible with read function %s returning %s",
type,
readFunction,
readFunction.getJavaType());
checkArgument(
type.getJavaType() == writeFunction.getJavaType(),
"Presto type %s is not compatible with write function %s accepting %s",
type,
writeFunction,
writeFunction.getJavaType());
}

public static ColumnMapping booleanMapping(Type prestoType, BooleanReadFunction readFunction, BooleanWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction);
}

public static ColumnMapping longMapping(Type prestoType, LongReadFunction readFunction, LongWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction);
}

public static ColumnMapping doubleMapping(Type prestoType, DoubleReadFunction readFunction, DoubleWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction);
}

public static ColumnMapping sliceMapping(Type prestoType, SliceReadFunction readFunction, SliceWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction);
}

public static ColumnMapping objectMapping(Type prestoType, ObjectReadFunction readFunction, ObjectWriteFunction writeFunction)
{
return new ColumnMapping(prestoType, readFunction, writeFunction);
}

public Type getType()
{
return type;
}

public ReadFunction getReadFunction()
{
return readFunction;
}

public WriteFunction getWriteFunction()
{
return writeFunction;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public interface DoubleWriteFunction
extends WriteFunction
{
default Class<?> getJavaType()
{
return double.class;
}

void set(PreparedStatement statement, int index, double value) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -49,7 +50,9 @@ default boolean schemaExists(JdbcIdentity identity, String schema)

List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle tableHandle);

Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);
Optional<ColumnMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle);

WriteMapping toWriteMapping(Type type);

ConnectorSplitSource getSplits(JdbcIdentity identity, JdbcTableLayoutHandle layoutHandle);

Expand Down
Loading

0 comments on commit b74d9ec

Please sign in to comment.