diff --git a/plugin/trino-spanner/pom.xml b/plugin/trino-spanner/pom.xml new file mode 100644 index 000000000000..639a2e7cc660 --- /dev/null +++ b/plugin/trino-spanner/pom.xml @@ -0,0 +1,168 @@ + + + 4.0.0 + + trino-root + io.trino + 411-SNAPSHOT + ../../pom.xml + + trino-spanner + + + 17 + 17 + + + + + io.trino + trino-base-jdbc + + + io.trino + trino-plugin-toolkit + + + io.airlift + configuration + + + io.airlift + log + + + com.google.cloud + google-cloud-spanner-jdbc + 2.9.9 + + + com.google.guava + guava + + + com.google.inject + guice + + + javax.validation + validation-api + + + io.airlift + concurrent + runtime + + + + io.airlift + log-manager + runtime + + + + io.airlift + units + runtime + + + + + io.trino + trino-spi + provided + + + + io.airlift + slice + provided + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + org.openjdk.jol + jol-core + provided + + + + + io.trino + trino-base-jdbc + test-jar + test + + + + io.trino + trino-jmx + test + + + io.trino + trino-main + test + + + io.trino + trino-main + test-jar + test + + + io.trino + trino-testing + test + + + io.trino + trino-testing-containers + test + + + + org.testcontainers + gcloud + 1.17.3 + test + + + + + io.trino + trino-testing-services + test + + + + io.trino + trino-tpch + test + + + + io.trino.tpch + tpch + test + + + io.airlift + testing + test + + + org.testng + testng + test + + + + diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerClient.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerClient.java new file mode 100644 index 000000000000..ce93d3e2ec5c --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerClient.java @@ -0,0 +1,685 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import io.trino.plugin.jdbc.BaseJdbcClient; +import io.trino.plugin.jdbc.BaseJdbcConfig; +import io.trino.plugin.jdbc.CaseSensitivity; +import io.trino.plugin.jdbc.ColumnMapping; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcOutputTableHandle; +import io.trino.plugin.jdbc.JdbcStatisticsConfig; +import io.trino.plugin.jdbc.JdbcTableHandle; +import io.trino.plugin.jdbc.JdbcTypeHandle; +import io.trino.plugin.jdbc.LongWriteFunction; +import io.trino.plugin.jdbc.ObjectWriteFunction; +import io.trino.plugin.jdbc.QueryBuilder; +import io.trino.plugin.jdbc.RemoteTableName; +import io.trino.plugin.jdbc.SliceWriteFunction; +import io.trino.plugin.jdbc.StandardColumnMappings; +import io.trino.plugin.jdbc.UnsupportedTypeHandling; +import io.trino.plugin.jdbc.WriteFunction; +import io.trino.plugin.jdbc.WriteMapping; +import io.trino.plugin.jdbc.logging.RemoteQueryModifier; +import io.trino.plugin.jdbc.mapping.IdentifierMapping; +import io.trino.spi.ErrorCode; +import io.trino.spi.ErrorCodeSupplier; +import io.trino.spi.ErrorType; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.statistics.TableStatistics; +import io.trino.spi.type.DateType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; +import io.trino.spi.type.VarcharType; + +import javax.inject.Inject; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.time.Duration; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.emptyToNull; +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; +import static io.trino.plugin.jdbc.PredicatePushdownController.FULL_PUSHDOWN; +import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.dateReadFunctionUsingLocalDate; +import static io.trino.plugin.jdbc.StandardColumnMappings.defaultVarcharColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.doubleWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryReadFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction; +import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling; +import static io.trino.plugin.jdbc.UnsupportedTypeHandling.IGNORE; +import static io.trino.spi.ErrorType.INTERNAL_ERROR; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarbinaryType.VARBINARY; +import static java.lang.String.format; +import static java.lang.String.join; +import static java.sql.DatabaseMetaData.columnNoNulls; +import static java.time.format.DateTimeFormatter.ISO_DATE; +import static java.util.stream.Collectors.joining; + +public class SpannerClient + extends BaseJdbcClient +{ + // Maps to Spanner's default empty schema + public static final String DEFAULT_SCHEMA = "default"; + private final SpannerConfig config; + private final IdentifierMapping identifierMapping; + private final String tableTypes[] = {"BASE TABLE", "VIEW"}; + + @Inject + public SpannerClient( + SpannerConfig spannerConfig, JdbcStatisticsConfig statisticsConfig, ConnectionFactory connectionFactory, QueryBuilder queryBuilder, TypeManager typeManager, IdentifierMapping identifierMapping, RemoteQueryModifier queryModifier) + { + super("`", connectionFactory, queryBuilder, + new LinkedHashSet<>(), + identifierMapping, queryModifier, true); + this.config = spannerConfig; + this.identifierMapping = identifierMapping; + } + + private static RemoteTableName getRemoteTable(ResultSet resultSet) + throws SQLException + { + String schema = resultSet.getString("TABLE_SCHEM"); + if (schema != null && schema.equals("")) { + schema = null; + } + return new RemoteTableName( + Optional.ofNullable(null), + Optional.ofNullable(schema), + resultSet.getString("TABLE_NAME")); + } + + public static void sleep() + { + try { + Thread.sleep(Duration.ofSeconds(1).toMillis()); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + int jdbcType = typeHandle.getJdbcType(); + String jdbcTypeName = typeHandle.getJdbcTypeName() + .orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle)); + System.out.println("Column mapping for type " + typeHandle); + System.out.println("JDBC TYPE NAME " + jdbcTypeName + " " + jdbcType); + Optional mapping = getForcedMappingToVarchar(typeHandle); + if (mapping.isPresent()) { + return mapping; + } + switch (jdbcType) { + case Types.BOOLEAN: + return Optional.of(StandardColumnMappings.booleanColumnMapping()); + case Types.SMALLINT: + case Types.INTEGER: + case Types.TINYINT: + case Types.BIGINT: + return Optional.of(StandardColumnMappings.bigintColumnMapping()); + case Types.NUMERIC: + case Types.DECIMAL: + return Optional.of(StandardColumnMappings.decimalColumnMapping(DecimalType.createDecimalType(9, 38))); + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + return Optional.of(doubleColumnMapping()); + case Types.CHAR: + case Types.VARCHAR: + case Types.NVARCHAR: + return Optional.of(defaultVarcharColumnMapping(typeHandle.getRequiredColumnSize(), false)); + case Types.BINARY: + return Optional.of(ColumnMapping.sliceMapping(VARBINARY, varbinaryReadFunction(), varbinaryWriteFunction(), FULL_PUSHDOWN)); + case Types.DATE: + return Optional.of(ColumnMapping.longMapping( + DATE, + dateReadFunctionUsingLocalDate(), + spannerDateWriteFunctionUsingLocalDate())); + case Types.TIMESTAMP: + return Optional.of(ColumnMapping.longMapping( + TimestampType.TIMESTAMP_MILLIS, + (resultSet, columnIndex) -> { + java.sql.Timestamp timestamp = resultSet.getTimestamp(columnIndex); + return timestamp.toInstant().toEpochMilli() * 1000; + }, + (statement, index, value) -> statement.setTimestamp(index, new java.sql.Timestamp(value / 1000)))); + default: + throw new TrinoException(SpannerErrorCode.SPANNER_ERROR_CODE, "Spanner type mapper cannot build type mapping for JDBC type " + typeHandle.getJdbcType()); + } + } + + private LongWriteFunction spannerDateWriteFunctionUsingLocalDate() + { + return new LongWriteFunction() + { + @Override + public String getBindExpression() + { + return "CAST(? AS DATE)"; + } + + @Override + public void set(PreparedStatement statement, int index, long epochDay) + throws SQLException + { + statement.setString(index, LocalDate.ofEpochDay(epochDay).format(ISO_DATE)); + } + }; + } + + @Override + protected void execute(ConnectorSession session, String query) + { + } + + @Override + protected void execute(ConnectorSession session, Connection connection, String query) + throws SQLException + { + super.execute(session, connection, query); + } + + @Override + public WriteMapping toWriteMapping(ConnectorSession session, Type type) + { + //Spanner handles all types int and long types as INT64 + if (type == TINYINT || type == SMALLINT || type == INTEGER || type == BIGINT) { + return WriteMapping.longMapping("INT64", bigintWriteFunction()); + } + if (type == BOOLEAN) { + return WriteMapping.booleanMapping("BOOL", booleanWriteFunction()); + } + if (type instanceof DecimalType) { + return WriteMapping.objectMapping("NUMERIC", longDecimalWriteFunction(DecimalType.createDecimalType(9, 38))); + } + if (type == REAL || type == DOUBLE) { + return WriteMapping.doubleMapping("FLOAT64", doubleWriteFunction()); + } + if (type == VARBINARY) { + return WriteMapping.sliceMapping("BYTES(MAX)", + SliceWriteFunction.of(Types.LONGVARBINARY, + (statement, index, value) -> statement.setBytes(index, value.byteArray()))); + } + if (type instanceof TimestampType) { + return WriteMapping.objectMapping("TIMESTAMP", + spannerTimestampWriteFunction()); + } + if (type instanceof VarcharType varcharType) { + String dataType = "STRING(MAX)"; + if (!varcharType.isUnbounded() && varcharType.getBoundedLength() <= 16777215) { + dataType = String.format("STRING(%s)", varcharType.getBoundedLength()); + } + return WriteMapping.sliceMapping(dataType, varcharWriteFunction()); + } + if (type instanceof DateType) { + return WriteMapping.longMapping("DATE", + new LongWriteFunction() + { + @Override + public void set(PreparedStatement statement, int index, long value) + throws SQLException + { + statement.setDate(index, java.sql.Date.valueOf(LocalDate.ofEpochDay(value))); + } + }); + } + return WriteMapping.sliceMapping("STRING(MAX)", varcharWriteFunction()); + } + + private ObjectWriteFunction spannerTimestampWriteFunction() + { + return ObjectWriteFunction.of( + String.class, + (statement, index, value) -> statement.setTimestamp(index, + com.google.cloud.Timestamp.parseTimestamp(value).toSqlTimestamp())); + } + + @Override + protected Optional> getTableTypes() + { + return Optional.of(Arrays.asList(tableTypes)); + } + + @Override + public Collection listSchemas(Connection connection) + { + Set schemas = new HashSet<>(Collections.singleton(DEFAULT_SCHEMA)); + try (ResultSet resultSet = connection.getMetaData().getSchemas(null, null)) { + while (resultSet.next()) { + schemas.add(resultSet.getString(1)); + } + return schemas; + } + catch (SQLException e) { + return schemas; + } + } + + @Override + public boolean schemaExists(ConnectorSession session, String schema) + { + //There are no schemas in Spanner except for the default one that we have added and INFORMATION_SCHEMA + return schema.equalsIgnoreCase(DEFAULT_SCHEMA) || schema.equalsIgnoreCase("INFORMATION_SCHEMA"); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName) + { + throw new TrinoException(SpannerErrorCode.SPANNER_ERROR_CODE, "Spanner connector does not support creating schemas"); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName) + { + throw new TrinoException(SpannerErrorCode.SPANNER_ERROR_CODE, "Spanner connector does not support dropping schemas"); + } + + @Override + public List getTableNames(ConnectorSession session, Optional schema) + { + List tables = new ArrayList<>(); + try (Connection connection = connectionFactory.openConnection(session); + ResultSet resultSet = getTables(connection, Optional.empty(), Optional.empty())) { + while (resultSet.next()) { + tables.add(new SchemaTableName(DEFAULT_SCHEMA, resultSet.getString("TABLE_NAME"))); + } + } + catch (SQLException e) { + e.printStackTrace(); + } + return tables; + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + String sql = createSpannerTable(session, tableMetadata); + execute(session, sql); + sleep(); + } + + @Override + public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + //Do nothing + } + + @Override + public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + return super.beginCreateTable(session, tableMetadata); + } + + private String createSpannerTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) + { + Map columnAndDataTypeMap = tableMetadata.getTableSchema().getColumns() + .stream() + .collect(Collectors.toMap(k -> k.getName().toUpperCase(Locale.ENGLISH), + v -> toWriteMapping(session, v.getType()).getDataType(), + (k, v) -> v, LinkedHashMap::new)); + + Map properties = tableMetadata.getProperties(); + List primaryKeys = SpannerTableProperties.getPrimaryKey(properties); + List notNullFields = SpannerTableProperties.getNotNullFields(properties); + List commitTimestampFields = SpannerTableProperties.getCommitTimestampFields(properties); + Preconditions.checkArgument(!primaryKeys.isEmpty(), "Primary key is required to create a table in spanner"); + Map columns = new LinkedHashMap<>(); + columnAndDataTypeMap.forEach((column, dataType) -> { + columns.put(column, join(" ", quoted(column), dataType)); + if (notNullFields.contains(column)) { + String columnWithConstraint = String.format("%s NOT NULL", columns.get(column)); + columns.put(column, columnWithConstraint); + } + if (commitTimestampFields.contains(column)) { + String columnWithConstraint = String.format("%s OPTIONS(allow_commit_timestamp=true)", columns.get(column)); + columns.put(column, columnWithConstraint); + } + }); + String interleaveTable = SpannerTableProperties.getInterleaveInParent(properties); + boolean onDeleteCascade = SpannerTableProperties.getOnDeleteCascade(properties); + String interleaveClause = ""; + String onDeleteClause = ""; + if (interleaveTable != null) { + interleaveClause = String.format(", INTERLEAVE IN PARENT %s ", quoted(interleaveTable)); + onDeleteClause = onDeleteCascade ? " ON DELETE CASCADE " : " ON DELETE NO ACTION "; + } + String sql = format("CREATE TABLE %s (%s) PRIMARY KEY (%s) %s %s", + quoted(tableMetadata.getTable().getTableName()), String.join(", ", columns.values()), + quoted(join(", ", primaryKeys)), + interleaveClause, onDeleteClause); + System.out.println(sql); + return sql; + } + + @Override + protected String createTableSql(RemoteTableName remoteTableName, List columns, ConnectorTableMetadata tableMetadata) + { + return createSpannerTable(null, tableMetadata); + } + + public boolean checkTableExists(ConnectorSession session, String tableName) + throws SQLException + { + return checkTableExists(connectionFactory.openConnection(session), tableName); + } + + @Override + public ResultSet getTables(Connection connection, Optional schemaName, Optional tableName) + throws SQLException + { + DatabaseMetaData metadata = connection.getMetaData(); + return metadata.getTables( + null, + null, + escapeObjectNameForMetadataQuery(tableName, metadata.getSearchStringEscape()).orElse(null), + null); + } + + @Override + protected String getTableSchemaName(ResultSet resultSet) + throws SQLException + { + return null; + } + + @Override + protected ResultSet getColumns(JdbcTableHandle tableHandle, DatabaseMetaData metadata) + throws SQLException + { + RemoteTableName remoteTableName = tableHandle.getRequiredNamedRelation().getRemoteTableName(); + String schema = remoteTableName.getSchemaName().orElse(DEFAULT_SCHEMA) + .equalsIgnoreCase(DEFAULT_SCHEMA) ? null : escapeObjectNameForMetadataQuery(remoteTableName.getSchemaName(), metadata.getSearchStringEscape()).orElse(null); + return metadata.getColumns( + null, + schema, + escapeObjectNameForMetadataQuery(remoteTableName.getTableName(), metadata.getSearchStringEscape()), + null); + } + + @Override + public boolean supportsRetries() + { + return false; + } + + @Override + protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String targetTableName, Optional pageSinkIdColumn) + throws SQLException + { + return super.createTable(session, tableMetadata, targetTableName, pageSinkIdColumn); + } + + @Override + protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String targetTableName) + throws SQLException + { + return super.createTable(session, tableMetadata, targetTableName); + } + + @Override + public String buildInsertSql(JdbcOutputTableHandle handle, List columnWriters) + { + boolean hasPageSinkIdColumn = handle.getPageSinkIdColumnName().isPresent(); + checkArgument(handle.getColumnNames().size() == columnWriters.size(), "handle and columnWriters mismatch: %s, %s", handle, columnWriters); + String sql = format( + "INSERT INTO %s (%s%s) VALUES (%s%s)", + quoted(null, null, handle.getTableName()), + handle.getColumnNames().stream() + .map(this::quoted) + .collect(joining(", ")), + hasPageSinkIdColumn ? ", " + quoted(handle.getPageSinkIdColumnName().get()) : "", + columnWriters.stream() + .map(WriteFunction::getBindExpression) + .collect(joining(",")), + hasPageSinkIdColumn ? ", ?" : ""); + System.out.println("INSERT SQL " + sql); + return sql; + } + + @Override + public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTableHandle tableHandle, List columns) + { + SchemaTableName schemaTableName = tableHandle.asPlainTable().getSchemaTableName(); + ConnectorIdentity identity = session.getIdentity(); + + verify(tableHandle.getAuthorization().isEmpty(), "Unexpected authorization is required for table: %s".formatted(tableHandle)); + try (Connection connection = connectionFactory.openConnection(session)) { + verify(connection.getAutoCommit()); + String remoteSchema = identifierMapping.toRemoteSchemaName(identity, connection, schemaTableName.getSchemaName()); + String remoteTable = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, schemaTableName.getTableName()); + + ImmutableList.Builder columnNames = ImmutableList.builder(); + ImmutableList.Builder columnTypes = ImmutableList.builder(); + ImmutableList.Builder jdbcColumnTypes = ImmutableList.builder(); + for (JdbcColumnHandle column : columns) { + columnNames.add(column.getColumnName()); + columnTypes.add(column.getColumnType()); + jdbcColumnTypes.add(column.getJdbcTypeHandle()); + } + return new JdbcOutputTableHandle( + null, + DEFAULT_SCHEMA, + remoteTable, + columnNames.build(), + columnTypes.build(), + Optional.of(jdbcColumnTypes.build()), + Optional.empty(), + Optional.empty()); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + + @Override + public List getColumns(ConnectorSession session, JdbcTableHandle tableHandle) + { + if (tableHandle.getColumns().isPresent()) { + return tableHandle.getColumns().get(); + } + checkArgument(tableHandle.isNamedRelation(), "Cannot get columns for %s", tableHandle); + verify(tableHandle.getAuthorization().isEmpty(), "Unexpected authorization is required for table: %s".formatted(tableHandle)); + SchemaTableName schemaTableName = tableHandle.getRequiredNamedRelation().getSchemaTableName(); + RemoteTableName remoteTableName = tableHandle.getRequiredNamedRelation().getRemoteTableName(); + try (Connection connection = connectionFactory.openConnection(session); + ResultSet resultSet = getColumns(tableHandle, connection.getMetaData())) { + Map caseSensitivityMapping = getCaseSensitivityForColumns(session, connection, tableHandle); + int allColumns = 0; + List columns = new ArrayList<>(); + while (resultSet.next()) { + // skip if table doesn't match expected + RemoteTableName remoteTable = getRemoteTable(resultSet); + if (!(Objects.equals(remoteTableName, remoteTable))) { + continue; + } + allColumns++; + String columnName = resultSet.getString("COLUMN_NAME"); + JdbcTypeHandle typeHandle = new JdbcTypeHandle( + getInteger(resultSet, "DATA_TYPE").orElseThrow(() -> new IllegalStateException("DATA_TYPE is null")), + Optional.ofNullable(resultSet.getString("TYPE_NAME")), + getInteger(resultSet, "COLUMN_SIZE"), + getInteger(resultSet, "DECIMAL_DIGITS"), + Optional.empty(), + Optional.ofNullable(caseSensitivityMapping.get(columnName))); + Optional columnMapping = toColumnMapping(session, connection, typeHandle); + //log.debug("Mapping data type of '%s' column '%s': %s mapped to %s", schemaTableName, columnName, typeHandle, columnMapping); + boolean nullable = (resultSet.getInt("NULLABLE") != columnNoNulls); + // Note: some databases (e.g. SQL Server) do not return column remarks/comment here. + Optional comment = Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS"))); + // skip unsupported column types + columnMapping.ifPresent(mapping -> columns.add(JdbcColumnHandle.builder() + .setColumnName(columnName) + .setJdbcTypeHandle(typeHandle) + .setColumnType(mapping.getType()) + .setNullable(nullable) + .setComment(comment) + .build())); + if (columnMapping.isEmpty()) { + UnsupportedTypeHandling unsupportedTypeHandling = getUnsupportedTypeHandling(session); + verify( + unsupportedTypeHandling == IGNORE, + "Unsupported type handling is set to %s, but toColumnMapping() returned empty for %s", + unsupportedTypeHandling, + typeHandle); + } + } + if (columns.isEmpty()) { + throw new TableNotFoundException( + schemaTableName, + format("Table '%s' has no supported columns (all %s columns are not supported)", schemaTableName, allColumns)); + } + return ImmutableList.copyOf(columns); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + + @Override + public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle) + { + System.out.println("Called table statistics"); + return super.getTableStatistics(session, handle); + } + + public boolean checkTableExists(Connection connection, String tableName) + throws SQLException + { + ResultSet tablesFromSpanner = getTables(connection, Optional.empty(), Optional.empty()); + boolean exists = false; + while (tablesFromSpanner.next()) { + String table = tablesFromSpanner.getString("TABLE_NAME"); + if (table.equalsIgnoreCase(tableName)) { + exists = true; + break; + } + } + return exists; + } + + @Override + public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle, Set pageSinkIds) + { + //Nothing to do after insert + } + + @Override + public Optional getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + boolean tableExists = false; + try { + tableExists = checkTableExists(session, schemaTableName.getTableName()); + if (tableExists) { + return Optional.of(new JdbcTableHandle(new SchemaTableName(DEFAULT_SCHEMA, schemaTableName.getTableName()), + new RemoteTableName(Optional.empty(), + Optional.empty(), schemaTableName.getTableName()), + Optional.empty())); + } + else { + return Optional.empty(); + } + } + catch (SQLException e) { + throw new TrinoException(SpannerErrorCode.SPANNER_ERROR_CODE, e); + } + } + + @Override + public void dropTable(ConnectorSession session, JdbcTableHandle handle) + { + System.out.println("Drop table "); + SchemaTableName schemaTableName = handle.getRequiredNamedRelation().getSchemaTableName(); + try (Connection connection = connectionFactory.openConnection(session)) { + String format = format("DROP TABLE %s", schemaTableName.getTableName()); + connection.createStatement().executeUpdate(format); + } + catch (SQLException e) { + e.printStackTrace(); + } + } + + @Override + public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle) + { + // System.out.println("PROPS WAS CALLED "); + return new HashMap<>(); + } + + public enum SpannerErrorCode + implements ErrorCodeSupplier + { + SPANNER_ERROR_CODE(1, INTERNAL_ERROR); + + private final ErrorCode errorCode; + + SpannerErrorCode(int code, ErrorType type) + { + errorCode = new ErrorCode(code + 0x0506_0000, name(), type); + } + + @Override + public ErrorCode toErrorCode() + { + return errorCode; + } + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConfig.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConfig.java new file mode 100644 index 000000000000..0755b89816d3 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConfig.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import io.airlift.configuration.Config; + +public class SpannerConfig +{ + private String credentialsFile; + private String projectId; + + public String getHost() + { + return host; + } + @Config("spanner.emulated.host") + public void setHost(String host) + { + this.host = host; + } + + private String host = ""; + + public boolean isEmulator() + { + return isEmulator; + } + + @Config("spanner.emulated") + public void setEmulator(boolean emulator) + { + isEmulator = emulator; + } + + private boolean isEmulator = false; + private String instanceId; + private String database; + + public String getProjectId() + { + return projectId; + } + + @Config("spanner.projectId") + public void setProjectId(String projectId) + { + this.projectId = projectId; + } + + + public String getInstanceId() + { + return instanceId; + } + @Config("spanner.instanceId") + public void setInstanceId(String instanceId) + { + this.instanceId = instanceId; + } + + public String getDatabase() + { + return database; + } + + @Config("spanner.database") + public void setDatabase(String database) + { + this.database = database; + } + /*private int minSessions = 100; + private int maxSessions = 400; + private int numChannels = 4; + private boolean retryAbortsInternally = true;*/ + + public String getCredentialsFile() + { + return credentialsFile; + } + + @Config("spanner.credentials.file") + public void setCredentialsFile(String credentialsFile) + { + this.credentialsFile = credentialsFile; + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnector.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnector.java new file mode 100644 index 000000000000..83a77380f9e8 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnector.java @@ -0,0 +1,110 @@ +package io.trino.plugin.spanner; + +import com.google.inject.Inject; +import io.airlift.bootstrap.LifeCycleManager; +import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.plugin.jdbc.JdbcTransactionHandle; +import io.trino.plugin.jdbc.TablePropertiesProvider; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorRecordSetProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.transaction.IsolationLevel; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class SpannerConnector implements Connector +{ + private final LifeCycleManager lifeCycleManager; + private final ConnectorMetadata metadata; + private final ConnectorSplitManager splitManager; + private final ConnectorRecordSetProvider recordSetProvider; + private final ConnectorPageSinkProvider pageSinkProvider; + private final List> tableProperties; + private final List> sessionProperties; + + @Inject + public SpannerConnector( + LifeCycleManager lifeCycleManager, + SpannerConnectorMetadata metadata, + ConnectorSplitManager splitManager, + ConnectorRecordSetProvider recordSetProvider, + ConnectorPageSinkProvider pageSinkProvider, + Set tableProperties, + Set sessionProperties) + { + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null"); + this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); + this.tableProperties = tableProperties.stream() + .flatMap(tablePropertiesProvider -> tablePropertiesProvider.getTableProperties().stream()) + .collect(toImmutableList()); + this.sessionProperties = sessionProperties.stream() + .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream()) + .collect(toImmutableList()); + } + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit) + { + return new JdbcTransactionHandle(); + } + + @Override + public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transaction) + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() + { + return recordSetProvider; + } + + @Override + public ConnectorPageSinkProvider getPageSinkProvider() + { + return pageSinkProvider; + } + + @Override + public List> getTableProperties() + { + return tableProperties; + } + + @Override + public List> getColumnProperties() + { + return Collections.emptyList(); + } + + @Override + public List> getSessionProperties() + { + return sessionProperties; + } + + @Override + public final void shutdown() + { + lifeCycleManager.stop(); + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnectorFactory.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnectorFactory.java new file mode 100644 index 000000000000..d15fdebf4c41 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnectorFactory.java @@ -0,0 +1,60 @@ +package io.trino.plugin.spanner; + +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.json.JsonModule; +import io.trino.plugin.base.CatalogName; +import io.trino.spi.NodeManager; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorContext; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.spi.type.TypeManager; + +import java.util.Map; + +import static io.trino.plugin.base.Versions.checkSpiVersion; +import static java.util.Objects.requireNonNull; + +public class SpannerConnectorFactory + implements ConnectorFactory +{ + private final ClassLoader classLoader; + + public SpannerConnectorFactory(ClassLoader classLoader) + { + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public String getName() + { + return "spanner"; + } + + @Override + public Connector create(String catalogName, Map requiredConfig, ConnectorContext context) + { + requireNonNull(requiredConfig, "requiredConfig is null"); + checkSpiVersion(context, this); + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + Bootstrap app = new Bootstrap( + new JsonModule(), + new SpannerModule(catalogName), + binder -> { + binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); + binder.bind(ClassLoader.class).toInstance(SpannerConnectorFactory.class.getClassLoader()); + binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(NodeManager.class).toInstance(context.getNodeManager()); + }); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties(requiredConfig) + .initialize(); + + return injector.getInstance(SpannerConnector.class); + } + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnectorMetadata.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnectorMetadata.java new file mode 100644 index 000000000000..a16ecf6160fc --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerConnectorMetadata.java @@ -0,0 +1,252 @@ +package io.trino.plugin.spanner; + +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.trino.plugin.jdbc.DefaultJdbcMetadata; +import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcNamedRelationHandle; +import io.trino.plugin.jdbc.JdbcOutputTableHandle; +import io.trino.plugin.jdbc.JdbcQueryEventListener; +import io.trino.plugin.jdbc.JdbcTableHandle; +import io.trino.plugin.jdbc.RemoteTableName; +import io.trino.plugin.jdbc.mapping.IdentifierMapping; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.AggregateFunction; +import io.trino.spi.connector.AggregationApplicationResult; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorOutputMetadata; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.ConnectorTableLayout; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.ConnectorTableProperties; +import io.trino.spi.connector.ConnectorTableSchema; +import io.trino.spi.connector.LocalProperty; +import io.trino.spi.connector.RetryMode; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.SortingProperty; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.statistics.ComputedStatistics; + +import javax.annotation.Nullable; +import javax.inject.Inject; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.connector.RetryMode.NO_RETRIES; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class SpannerConnectorMetadata + extends DefaultJdbcMetadata +{ + + public static final String DEFAULT_SCHEMA = "default"; + private final SpannerClient spannerClient; + private final IdentifierMapping identifierMapping; + + @Inject + public SpannerConnectorMetadata(SpannerClient spannerClient, + IdentifierMapping identifierMapping, + Set jdbcQueryEventListeners) + { + super(spannerClient, false, jdbcQueryEventListeners); + this.spannerClient = requireNonNull(spannerClient, "spannerClient is null"); + this.identifierMapping = requireNonNull(identifierMapping, "identifierMapping is null"); + } + + public static @Nullable + String toTrinoSchemaName(@Nullable String schema) + { + return "".equals(schema) ? DEFAULT_SCHEMA : schema; + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName schemaTableName) + { + return spannerClient.getTableHandle(session, schemaTableName) + .map(JdbcTableHandle::asPlainTable) + .map(JdbcNamedRelationHandle::getRemoteTableName) + .map(remoteTableName -> new JdbcTableHandle( + schemaTableName, + new RemoteTableName(remoteTableName.getCatalogName(), + Optional.ofNullable(toTrinoSchemaName(remoteTableName.getSchemaName().orElse(null))), remoteTableName.getTableName()), + Optional.empty())) + .orElse(null); + } + + @Override + public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table) + { + JdbcTableHandle tableHandle = (JdbcTableHandle) table; + List> sortingProperties = tableHandle.getSortOrder() + .map(properties -> properties + .stream() + .map(item -> (LocalProperty) new SortingProperty( + item.getColumn(), + item.getSortOrder())) + .collect(toImmutableList())) + .orElse(ImmutableList.of()); + + return new ConnectorTableProperties(TupleDomain.all(), Optional.empty(), Optional.empty(), Optional.empty(), sortingProperties); + } + + @Override + public ConnectorTableSchema getTableSchema(ConnectorSession session, ConnectorTableHandle table) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + return new ConnectorTableSchema( + getSchemaTableName(handle), + getColumnMetadata(session, handle).stream() + .map(ColumnMetadata::getColumnSchema) + .collect(toImmutableList())); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + JdbcTableHandle handle = (JdbcTableHandle) table; + return new ConnectorTableMetadata( + getSchemaTableName(handle), + getColumnMetadata(session, handle), + spannerClient.getTableProperties(session, handle)); + } + + private List getColumnMetadata(ConnectorSession session, JdbcTableHandle handle) + { + return spannerClient.getColumns(session, handle).stream() + .map(JdbcColumnHandle::getColumnMetadata) + .collect(toImmutableList()); + } + + @Override + public void createSchema(ConnectorSession session, String schemaName, Map properties, TrinoPrincipal owner) + { + throw new TrinoException(NOT_SUPPORTED, "Can't create schemas in Spanner"); + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName) + { + throw new TrinoException(NOT_SUPPORTED, "Can't drop schema which in spanner"); + } + + @Override + public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) + { + spannerClient.beginCreateTable(session, tableMetadata); + } + + @Override + public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) + { + if (retryMode != NO_RETRIES) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); + } + return spannerClient.beginCreateTable(session, tableMetadata); + } + + @Override + public Optional finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + return Optional.empty(); + } + + @Override + public boolean supportsMissingColumnsOnInsert() + { + return true; + } + + @Override + public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List columns, RetryMode retryMode) + { + if (retryMode != NO_RETRIES) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries"); + } + JdbcTableHandle handle = (JdbcTableHandle) tableHandle; + + List columnHandles = columns.stream() + .map(JdbcColumnHandle.class::cast) + .collect(toImmutableList()); + + RemoteTableName remoteTableName = handle.asPlainTable().getRemoteTableName(); + return new JdbcOutputTableHandle( + "spanner", + remoteTableName.getSchemaName().orElse(null), + remoteTableName.getTableName(), + columnHandles.stream().map(JdbcColumnHandle::getColumnName).collect(toImmutableList()), + columnHandles.stream().map(JdbcColumnHandle::getColumnType).collect(toImmutableList()), + Optional.of(columnHandles.stream().map(JdbcColumnHandle::getJdbcTypeHandle).collect(toImmutableList())), + Optional.empty(), + Optional.empty()); + } + + @Override + public Optional finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection fragments, Collection computedStatistics) + { + return Optional.empty(); + } + + @Override + public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column) + { + if (column.getComment() != null) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments"); + } + + JdbcTableHandle handle = (JdbcTableHandle) tableHandle; + RemoteTableName remoteTableName = handle.asPlainTable().getRemoteTableName(); + spannerClient.execute(session, format( + "ALTER TABLE %s ADD %s %s", + remoteTableName.getTableName(), + spannerClient.quoted(column.getName()), + spannerClient.toWriteMapping(session, column.getType()).getDataType())); + } + + @Override + public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column) + { + JdbcTableHandle handle = (JdbcTableHandle) tableHandle; + JdbcColumnHandle columnHandle = (JdbcColumnHandle) column; + RemoteTableName remoteTableName = handle.asPlainTable().getRemoteTableName(); + spannerClient.execute(session, format( + "ALTER TABLE %s DROP COLUMN %s", + remoteTableName.getTableName(), + spannerClient.quoted(columnHandle.getColumnName()))); + } + + @Override + public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) + { + spannerClient.dropTable(session, (JdbcTableHandle) tableHandle); + } + + @Override + public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support truncating tables"); + } + + @Override + public Optional> applyAggregation( + ConnectorSession session, + ConnectorTableHandle table, + List aggregates, + Map assignments, + List> groupingSets) + { + // TODO support aggregation pushdown + return Optional.empty(); + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerModule.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerModule.java new file mode 100644 index 000000000000..1100f484414a --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerModule.java @@ -0,0 +1,300 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.cloud.spanner.jdbc.JdbcDriver; +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import com.google.inject.multibindings.Multibinder; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.base.classloader.ForClassLoaderSafe; +import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.plugin.jdbc.BaseJdbcConfig; +import io.trino.plugin.jdbc.ConfiguringConnectionFactory; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.DecimalModule; +import io.trino.plugin.jdbc.DefaultJdbcMetadataFactory; +import io.trino.plugin.jdbc.DefaultQueryBuilder; +import io.trino.plugin.jdbc.DriverConnectionFactory; +import io.trino.plugin.jdbc.DynamicFilteringStats; +import io.trino.plugin.jdbc.ForBaseJdbc; +import io.trino.plugin.jdbc.ForJdbcDynamicFiltering; +import io.trino.plugin.jdbc.ForLazyConnectionFactory; +import io.trino.plugin.jdbc.ForRecordCursor; +import io.trino.plugin.jdbc.JdbcClient; +import io.trino.plugin.jdbc.JdbcDiagnosticModule; +import io.trino.plugin.jdbc.JdbcDynamicFilteringConfig; +import io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties; +import io.trino.plugin.jdbc.JdbcDynamicFilteringSplitManager; +import io.trino.plugin.jdbc.JdbcMetadataConfig; +import io.trino.plugin.jdbc.JdbcMetadataFactory; +import io.trino.plugin.jdbc.JdbcMetadataSessionProperties; +import io.trino.plugin.jdbc.JdbcPageSinkProvider; +import io.trino.plugin.jdbc.JdbcQueryEventListener; +import io.trino.plugin.jdbc.JdbcRecordSetProvider; +import io.trino.plugin.jdbc.JdbcSplitManager; +import io.trino.plugin.jdbc.JdbcStatisticsConfig; +import io.trino.plugin.jdbc.JdbcTransactionManager; +import io.trino.plugin.jdbc.JdbcWriteConfig; +import io.trino.plugin.jdbc.JdbcWriteSessionProperties; +import io.trino.plugin.jdbc.LazyConnectionFactory; +import io.trino.plugin.jdbc.MaxDomainCompactionThreshold; +import io.trino.plugin.jdbc.QueryBuilder; +import io.trino.plugin.jdbc.ReusableConnectionFactoryModule; +import io.trino.plugin.jdbc.StatsCollecting; +import io.trino.plugin.jdbc.TablePropertiesProvider; +import io.trino.plugin.jdbc.TypeHandlingJdbcConfig; +import io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties; +import io.trino.plugin.jdbc.credential.EmptyCredentialProvider; +import io.trino.plugin.jdbc.logging.RemoteQueryModifier; +import io.trino.plugin.jdbc.logging.RemoteQueryModifierModule; +import io.trino.plugin.jdbc.mapping.IdentifierMapping; +import io.trino.plugin.jdbc.mapping.IdentifierMappingModule; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorRecordSetProvider; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.procedure.Procedure; +import io.trino.spi.type.TypeManager; + +import javax.annotation.PreDestroy; +import javax.inject.Provider; + +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static com.google.inject.multibindings.Multibinder.newSetBinder; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; +import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static java.util.Objects.requireNonNull; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class SpannerModule + extends AbstractConfigurationAwareModule +{ + private final String catalogName; + + public SpannerModule(String catalogName) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + } + + @Provides + @Singleton + @ForBaseJdbc + public static ConnectionFactory createConnectionFactory( + SpannerConfig spannerConfig) + throws ClassNotFoundException + { + Class.forName("com.google.cloud.spanner.jdbc.JdbcDriver"); + Properties connectionProperties = new Properties(); + String connectionUrlTemplate = "jdbc:cloudspanner:/%s/projects/%s/instances/%s/databases/%s%s"; + String host = "/"; + String configureEmulator = ";autoConfigEmulator=true"; + if (spannerConfig.isEmulator()) { + host = host + spannerConfig.getHost(); + } + String url = String.format(connectionUrlTemplate, host, spannerConfig.getProjectId(), spannerConfig.getInstanceId(), spannerConfig.getDatabase(), configureEmulator); + JdbcDriver driver = new JdbcDriver(); + System.out.println("USING connection URL " + url); + //File credentials = new File(spannerConfig.getCredentialsFile()); + if (!driver.acceptsURL(url)) { + throw new RuntimeException( + url + " is incorrect"); + } + //jdbc:cloudspanner://0.0.0.0:9010/projects/test-project/instances/test-instance/databases/trinodb;autoConfigEmulator=true + //jdbc:cloudspanner://localhost:9010/projects/test-project/instances/test-instance/databases/test-db;usePlainText=true + //connectionProperties.put("credentials", spannerConfig.getCredentialsFile()); + connectionProperties.setProperty("retryAbortsInternally", "true"); + return new ConfiguringConnectionFactory(new DriverConnectionFactory( + driver, + url, + connectionProperties, + new EmptyCredentialProvider()), + connection -> { + }); + } + + @Provides + @Singleton + public static JdbcStatisticsConfig getConf() + { + JdbcStatisticsConfig jdbcStatisticsConfig = new JdbcStatisticsConfig(); + jdbcStatisticsConfig.setEnabled(true); + return jdbcStatisticsConfig; + } + + @Provides + @Singleton + public static SpannerClient getSpannerClient( + SpannerConfig spannerConfig, + JdbcStatisticsConfig statisticsConfig, + ConnectionFactory connectionFactory, + QueryBuilder queryBuilder, + TypeManager typeManager, + IdentifierMapping identifierMapping, + RemoteQueryModifier queryModifier) + { + return new SpannerClient( + spannerConfig, + statisticsConfig, + connectionFactory, + queryBuilder, + typeManager, + identifierMapping, + queryModifier); + } + + @Provides + public static SpannerTableProperties tableProperties() + { + return new SpannerTableProperties(); + } + + @Singleton + public static SpannerSinkProvider configureSpannerSink( + ConnectionFactory factory, + RemoteQueryModifier modifier, + SpannerClient spannerClient, + SpannerConfig config, + SpannerTableProperties propertiesProvider) + { + return new SpannerSinkProvider(factory, modifier, spannerClient, config, propertiesProvider); + } + + public static Multibinder sessionPropertiesProviderBinder(Binder binder) + { + return newSetBinder(binder, SessionPropertiesProvider.class); + } + + public static void bindSessionPropertiesProvider(Binder binder, Class type) + { + sessionPropertiesProviderBinder(binder).addBinding().to(type).in(Scopes.SINGLETON); + } + + public static Multibinder procedureBinder(Binder binder) + { + return newSetBinder(binder, Procedure.class); + } + + public static void bindProcedure(Binder binder, Class> type) + { + procedureBinder(binder).addBinding().toProvider(type).in(Scopes.SINGLETON); + } + + public static Multibinder tablePropertiesProviderBinder(Binder binder) + { + return newSetBinder(binder, TablePropertiesProvider.class); + } + + public static void bindTablePropertiesProvider(Binder binder, Class type) + { + tablePropertiesProviderBinder(binder).addBinding().to(type).in(Scopes.SINGLETON); + } + + @Provides + @Singleton + @ForBaseJdbc + public BaseJdbcConfig getBaseJdbcConfig(SpannerConfig config) + { + return new BaseJdbcConfig(); + } + + @Override + public void setup(Binder binder) + { + install(new RemoteQueryModifierModule()); + binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); + //binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON); + binder.bind(ConnectorRecordSetProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); + //binder.bind(ConnectorRecordSetProvider.class).to(ClassLoaderSafeConnectorRecordSetProvider.class).in(Scopes.SINGLETON); + binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcPageSinkProvider.class).in(Scopes.SINGLETON); + //binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON); + binder.bind(QueryBuilder.class).to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)); + //configBinder(binder).bindConfigDefaults(JdbcMetadataConfig.class, config -> config.setDomainCompactionThreshold(DEFAULT_DOMAIN_COMPACTION_THRESHOLD)); + + configBinder(binder).bindConfig(TypeHandlingJdbcConfig.class); + bindSessionPropertiesProvider(binder, TypeHandlingJdbcSessionProperties.class); + bindSessionPropertiesProvider(binder, JdbcMetadataSessionProperties.class); + bindSessionPropertiesProvider(binder, JdbcWriteSessionProperties.class); + bindSessionPropertiesProvider(binder, SpannerSessionProperties.class); + bindSessionPropertiesProvider(binder, JdbcDynamicFilteringSessionProperties.class); + newOptionalBinder(binder, Key.get(ConnectorSplitManager.class, ForJdbcDynamicFiltering.class)).setDefault().to(JdbcSplitManager.class).in(Scopes.SINGLETON); + + binder.bind(DynamicFilteringStats.class).in(Scopes.SINGLETON); + newExporter(binder).export(DynamicFilteringStats.class) + .as(generator -> generator.generatedNameOf(DynamicFilteringStats.class, catalogName)); + + newOptionalBinder(binder, JdbcMetadataFactory.class).setBinding().to(DefaultJdbcMetadataFactory.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, Key.get(ConnectorSplitManager.class, ForJdbcDynamicFiltering.class)).setDefault().to(JdbcSplitManager.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, ConnectorSplitManager.class).setBinding().to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, ConnectorRecordSetProvider.class).setBinding().to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, ConnectorPageSinkProvider.class).setBinding().to(SpannerSinkProvider.class).in(Scopes.SINGLETON); + + binder.bind(JdbcTransactionManager.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(JdbcMetadataConfig.class); + configBinder(binder).bindConfig(JdbcWriteConfig.class); + configBinder(binder).bindConfig(JdbcDynamicFilteringConfig.class); + + binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(Key.get(SpannerClient.class)).in(Scopes.SINGLETON); + binder.bind(JdbcClient.class).to(Key.get(JdbcClient.class, StatsCollecting.class)).in(Scopes.SINGLETON); + binder.bind(ConnectorMetadata.class).annotatedWith(ForClassLoaderSafe.class).to(SpannerConnectorMetadata.class).in(Scopes.SINGLETON); + binder.bind(ConnectionFactory.class) + .annotatedWith(ForLazyConnectionFactory.class) + .to(Key.get(ConnectionFactory.class, StatsCollecting.class)) + .in(Scopes.SINGLETON); + install(conditionalModule( + SpannerConfig.class, + (p) -> true, + new ReusableConnectionFactoryModule(), + innerBinder -> innerBinder.bind(ConnectionFactory.class).to(LazyConnectionFactory.class).in(Scopes.SINGLETON))); + + bindTablePropertiesProvider(binder, SpannerTableProperties.class); + + binder.bind(SpannerConnector.class).in(Scopes.SINGLETON); + install(new JdbcDiagnosticModule()); + install(new IdentifierMappingModule()); + install(new DecimalModule()); + } + + @PreDestroy + public void shutdownRecordCursorExecutor(@ForRecordCursor ExecutorService executor) + { + executor.shutdownNow(); + } + + @Singleton + @ForRecordCursor + @Provides + public ExecutorService createRecordCursorExecutor() + { + return newDirectExecutorService(); + } + + @Singleton + @Provides + public SpannerConnectorMetadata getSpannerMetadata( + SpannerClient client, IdentifierMapping mapping, + Set listeners) + { + return new SpannerConnectorMetadata(client, mapping, listeners); + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerPlugin.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerPlugin.java new file mode 100644 index 000000000000..d72b06046207 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerPlugin.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.jdbc.JdbcPlugin; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ConnectorFactory; + +public class SpannerPlugin + implements Plugin +{ + + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new SpannerConnectorFactory(SpannerPlugin.class.getClassLoader())); + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSessionProperties.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSessionProperties.java new file mode 100644 index 000000000000..0aec148c9971 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSessionProperties.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.spi.session.PropertyMetadata; + +import java.util.List; + +public class SpannerSessionProperties + implements SessionPropertiesProvider +{ + public static final String WRITE_MODE = "write_mode"; + private final ImmutableList> sessionProperties; + + public SpannerSessionProperties() + { + sessionProperties = ImmutableList.>builder() + .add(PropertyMetadata.enumProperty(WRITE_MODE, + "On write mode INSERT spanner throws an error on insert with duplicate primary keys," + + "on write mode UPSERT spanner updates the record with existing primary key with the new record being", + Mode.class, + Mode.INSERT, + false)).build(); + } + + @Override + public List> getSessionProperties() + { + return sessionProperties; + } + + enum Mode + { + INSERT, UPSERT + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSink.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSink.java new file mode 100644 index 000000000000..6e9fb8380a39 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSink.java @@ -0,0 +1,190 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionManager; +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.plugin.jdbc.JdbcOutputTableHandle; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSinkId; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.type.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.getWriteBatchSize; +import static java.time.format.DateTimeFormatter.ISO_DATE; +import static java.util.concurrent.CompletableFuture.completedFuture; + +public class SpannerSink + implements ConnectorPageSink +{ + private final SpannerOptions options; + private final int maxBatchSize; + private final DatabaseClient client; + private final List columnTypes; + private final List columnNames; + private final String table; + private final ConnectorPageSinkId pageSinkId; + private final SpannerSessionProperties.Mode writeMode; + private final boolean isEmulatedHost; + private final Logger LOG = LoggerFactory.getLogger(SpannerSink.class); + int maxRetries = 3; + int retry = 0; + private List mutations = new LinkedList<>(); + + public SpannerSink(SpannerConfig config, ConnectorSession session, JdbcOutputTableHandle handle, + ConnectorPageSinkId pageSinkId) + { + isEmulatedHost = config.isEmulator(); + SpannerOptions.Builder builder = SpannerOptions + .newBuilder() + .setProjectId(config.getProjectId()); + if (isEmulatedHost) { + builder.setEmulatorHost(config.getHost()) + .setCredentials(NoCredentials.getInstance()); + } + else { + //builder.setCredentials(Credentials); + } + this.options = builder.build(); + this.pageSinkId = pageSinkId; + this.maxBatchSize = getWriteBatchSize(session); + this.client = options.getService().getDatabaseClient(DatabaseId.of(config.getProjectId(), config.getInstanceId(), config.getDatabase())); + columnTypes = handle.getColumnTypes(); + columnNames = handle.getColumnNames(); + table = handle.getTableName(); + writeMode = session.getProperty(SpannerSessionProperties.WRITE_MODE, SpannerSessionProperties.Mode.class); + } + + public Mutation.WriteBuilder createWriteBuilder() + { + if (writeMode.equals(SpannerSessionProperties.Mode.UPSERT)) { + return Mutation.newInsertOrUpdateBuilder(table); + } + else { + return Mutation.newInsertBuilder(table); + } + } + + @Override + public CompletableFuture appendPage(Page page) + { + for (int position = 0; position < page.getPositionCount(); position++) { + Mutation.WriteBuilder writeBuilder = createWriteBuilder(); + for (int channel = 0; channel < page.getChannelCount(); channel++) { + Block block = page.getBlock(channel); + Type type = columnTypes.get(channel); + String columnName = columnNames.get(channel); + if (!block.isNull(position)) { + Class javaType = type.getJavaType(); + if (javaType == boolean.class) { + writeBuilder.set(columnName).to(type.getBoolean(block, position)); + } + else if (javaType == long.class) { + if (type.getDisplayName().equalsIgnoreCase("DATE")) { + String date = LocalDate.ofEpochDay(type.getLong(block, position)).format(ISO_DATE); + writeBuilder.set(columnName).to(date); + } + else { + writeBuilder.set(columnName).to(type.getLong(block, position)); + } + } + else if (javaType == double.class) { + writeBuilder.set(columnName).to(type.getDouble(block, position)); + } + else if (javaType == Slice.class) { + writeBuilder.set(columnName).to(type.getSlice(block, position).toStringUtf8()); + } + else { + System.out.println("TYPE CLASS " + javaType); + System.out.println("TYPE Display NAME " + type.getDisplayName()); + System.out.println("TYPE Base NAME " + type.getBaseName()); + System.out.println("TYPE ID " + type.getTypeId()); + System.out.println("TYPE Signature " + type.getTypeSignature()); + throw new RuntimeException("Unknown type"); + } + } + mutations.add(writeBuilder.build()); + if (mutations.size() >= maxBatchSize) { + write(); + } + } + } + return NOT_BLOCKED; + } + + private void write() + { + if (!mutations.isEmpty()) { + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.buffer(mutations); + manager.commit(); + break; + } + catch (AbortedException e) { + blockFor(e.getRetryDelayInMillis()); + transaction = manager.resetForRetry(); + } + } + } + } + mutations = new LinkedList<>(); + } + + private void blockFor(long delay) + { + try { + if (delay > 0L) { + TimeUnit.MILLISECONDS.sleep(delay); + } + } + catch (InterruptedException ignored) { + } + } + + @Override + public CompletableFuture> finish() + { + write(); + return completedFuture(ImmutableList.of(Slices.wrappedLongArray(pageSinkId.getId()))); + } + + @Override + public void abort() + { + mutations = null; + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSinkProvider.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSinkProvider.java new file mode 100644 index 000000000000..f029875de974 --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerSinkProvider.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.inject.Inject; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.JdbcOutputTableHandle; +import io.trino.plugin.jdbc.logging.RemoteQueryModifier; +import io.trino.spi.connector.ConnectorInsertTableHandle; +import io.trino.spi.connector.ConnectorOutputTableHandle; +import io.trino.spi.connector.ConnectorPageSink; +import io.trino.spi.connector.ConnectorPageSinkId; +import io.trino.spi.connector.ConnectorPageSinkProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTransactionHandle; + +public class SpannerSinkProvider + implements ConnectorPageSinkProvider +{ + private final SpannerClient client; + private final RemoteQueryModifier modifier; + private final SpannerTableProperties spannerTableProperties; + private final SpannerConfig config; + + @Inject + public SpannerSinkProvider( + ConnectionFactory connectionFactory, + RemoteQueryModifier modifier, + SpannerClient client, + SpannerConfig config, + SpannerTableProperties propertiesProvider) + { + System.out.println("Called Spanner Sink provider"); + this.client = client; + this.modifier = modifier; + System.out.println("TABLE PROPS in SINK " + propertiesProvider.getTableProperties()); + this.spannerTableProperties = propertiesProvider; + this.config = config; + } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle outputTableHandle, ConnectorPageSinkId pageSinkId) + { + return new SpannerSink(config, session, (JdbcOutputTableHandle) outputTableHandle, pageSinkId); + } + + @Override + public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle, ConnectorPageSinkId pageSinkId) + { + return new SpannerSink(config, session, (JdbcOutputTableHandle) tableHandle, pageSinkId); + } +} diff --git a/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerTableProperties.java b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerTableProperties.java new file mode 100644 index 000000000000..94530811b78f --- /dev/null +++ b/plugin/trino-spanner/src/main/java/io/trino/plugin/spanner/SpannerTableProperties.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.spanner; + +import com.google.common.collect.ImmutableList; +import io.trino.plugin.jdbc.TablePropertiesProvider; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +import static io.trino.spi.session.PropertyMetadata.booleanProperty; +import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public class SpannerTableProperties + implements TablePropertiesProvider +{ + public static final String PRIMARY_KEYS = "primary_keys"; + public static final String NOT_NULL_FIELDS = "not_null_fields"; + public static final String COMMIT_TIMESTAMP_FIELDS = "commit_timestamp_fields"; + public static final String INTERLEAVE_IN_PARENT = "interleave_in_parent"; + public static final String ON_DELETE_CASCADE = "on_delete_cascade"; + private final ImmutableList> sessionProperties; + + @Inject + public SpannerTableProperties() + { + System.out.println("CALLED TABLE PROPERTIES "); + sessionProperties = ImmutableList.>builder() + .add(new PropertyMetadata<>( + PRIMARY_KEYS, + "Primary keys for the table being created", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)) + .add(new PropertyMetadata<>( + NOT_NULL_FIELDS, + "Array of fields that should have NOT NULL constraints set on them in Spanner", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)) + .add(new PropertyMetadata<>( + COMMIT_TIMESTAMP_FIELDS, + "Array of timestamp fields that should have 'OPTIONS (allow_commit_timestamp=true)' constraints set on them in Spanner", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)) + .add(stringProperty(INTERLEAVE_IN_PARENT, + "Table name which needs to be interleaved with this table", null, false)) + .add(booleanProperty(ON_DELETE_CASCADE, + "Boolean property to cascade on delete. ON DELETE CASCADE if set to true or ON DELETE NO ACTION if false", + false, false)) + .build(); + } + + public static List getPrimaryKey(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + return ImmutableList.copyOf(toUpperCase((List) tableProperties.get(PRIMARY_KEYS))); + } + + public static String getInterleaveInParent(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + return (String) tableProperties.get(INTERLEAVE_IN_PARENT); + } + + public static List getNotNullFields(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + return ImmutableList.copyOf(toUpperCase((List) tableProperties.get(NOT_NULL_FIELDS))); + } + + public static List getCommitTimestampFields(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + return ImmutableList.copyOf(toUpperCase((List) tableProperties.get(COMMIT_TIMESTAMP_FIELDS))); + } + + public static boolean getOnDeleteCascade(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + return (boolean) tableProperties.get(ON_DELETE_CASCADE); + } + + private static List toUpperCase(List collection) + { + return collection.stream().map(f -> f.toUpperCase(Locale.ENGLISH)).collect(Collectors.toList()); + } + + @Override + public List> getTableProperties() + { + return sessionProperties; + } +} diff --git a/plugin/trino-spanner/src/main/resources/docker-compose.yaml b/plugin/trino-spanner/src/main/resources/docker-compose.yaml new file mode 100644 index 000000000000..f9305954ee75 --- /dev/null +++ b/plugin/trino-spanner/src/main/resources/docker-compose.yaml @@ -0,0 +1,30 @@ +version: '3' +services: + spanner: + image: gcr.io/cloud-spanner-emulator/emulator:latest + ports: + - "9010:9010" + - "9020:9020" + + gcloud-spanner-init: + image: gcr.io/google.com/cloudsdktool/cloud-sdk:latest + environment: + PROJECT_ID: "spanner-project" + SPANNER_EMULATOR_URL: "http://localhost:9020/" + INSTANCE_NAME: "spanner-instance" + DATABASE_NAME: "spanner-database" + command: > + bash -c 'gcloud config configurations create emulator && + gcloud config set auth/disable_credentials true && + gcloud config set project $${PROJECT_ID} && + gcloud config set api_endpoint_overrides/spanner $${SPANNER_EMULATOR_URL} && + gcloud config set auth/disable_credentials true && + gcloud spanner instances create $${INSTANCE_NAME} --config=emulator-config --description=Emulator --nodes=1 + gcloud spanner databases create $${DATABASE_NAME} --instance=$${INSTANCE_NAME}' + spanner-cli: + image: sjdaws/spanner-cli:latest + environment: + SPANNER_EMULATOR_HOST: "spanner:9010" + depends_on: + - "gcloud-spanner-init" + command: ['sh', '-c', 'echo spanner client.... && tail -f /dev/null'] diff --git a/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/SpannerQueryRunner.java b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/SpannerQueryRunner.java new file mode 100644 index 000000000000..7b25a4b3f110 --- /dev/null +++ b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/SpannerQueryRunner.java @@ -0,0 +1,179 @@ +package io.trino.plugin.spanner;/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.Session; +import io.trino.metadata.QualifiedObjectName; +import io.trino.plugin.jmx.JmxPlugin; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.MaterializedResult; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.intellij.lang.annotations.Language; +import org.jetbrains.annotations.NotNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static io.airlift.testing.Closeables.closeAllSuppress; +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; + +public final class SpannerQueryRunner +{ + private static final Logger LOG = Logger.get(SpannerQueryRunner.class); + private static final String TPCH_SCHEMA = "tpch"; + + private SpannerQueryRunner() {} + + public static DistributedQueryRunner createSpannerQueryRunner( + TestingSpannerInstance instance, + Map extraProperties, + Map connectorProperties, + Iterable> tables, boolean addTpcDsTables) + throws Exception + { + return createSpannerQueryRunner(instance, extraProperties, ImmutableMap.of(), connectorProperties, tables, runner -> {}, addTpcDsTables); + } + + public static DistributedQueryRunner createSpannerQueryRunner( + TestingSpannerInstance instance, + Map extraProperties, + Map coordinatorProperties, + Map connectorProperties, + Iterable> tables, + Consumer moreSetup, + boolean addTpcDsTables) + throws Exception + { + DistributedQueryRunner queryRunner = null; + try { + queryRunner = DistributedQueryRunner.builder( + createSession()) + .setExtraProperties(extraProperties) + .setCoordinatorProperties(coordinatorProperties) + .setAdditionalSetup(moreSetup) + .build(); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + // note: additional copy via ImmutableList so that if fails on nulls + connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties)); + connectorProperties.putIfAbsent("spanner.credentials.file", "credentials.json"); + connectorProperties.putIfAbsent("spanner.instanceId", instance.getInstanceId()); + connectorProperties.putIfAbsent("spanner.projectId", instance.getProjectId()); + connectorProperties.putIfAbsent("spanner.database", instance.getDatabaseId()); + connectorProperties.putIfAbsent("spanner.emulated", "true"); + connectorProperties.putIfAbsent("spanner.emulated.host", instance.getHost()); + /* connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties)); + connectorProperties.putIfAbsent("spanner.credentials.file", "credentials.json"); + connectorProperties.putIfAbsent("spanner.instanceId", "spanner-instance"); + connectorProperties.putIfAbsent("spanner.projectId", "spanner-project"); + connectorProperties.putIfAbsent("spanner.database", "spanner-database"); + connectorProperties.putIfAbsent("spanner.emulated", "true"); + connectorProperties.putIfAbsent("spanner.emulated.host", "localhost:9010");*/ + queryRunner.installPlugin(new SpannerPlugin()); + queryRunner.createCatalog("spanner", "spanner", connectorProperties); + if (addTpcDsTables) { + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), tables); + MaterializedResult execute = queryRunner.execute("SHOW TABLES FROM spanner.default"); + System.out.println(execute); + } + +/* + MaterializedResult rows = queryRunner.execute("SELECT * FROM spanner.default.customer"); + System.out.println(rows); +*/ + return queryRunner; + } + catch (Throwable e) { + closeAllSuppress(e, queryRunner, instance); + throw e; + } + } + + public static Session createSession() + { + return testSessionBuilder() + .setCatalog("spanner") + .setSchema("default") + .setCatalogSessionProperty("spanner", + SpannerSessionProperties.WRITE_MODE, + SpannerSessionProperties.Mode.UPSERT.name()) + .build(); + } + + public static void main(String[] args) + throws Exception + { + DistributedQueryRunner queryRunner = getSpannerQueryRunner(); + + queryRunner.installPlugin(new JmxPlugin()); + queryRunner.createCatalog("jmx", "jmx"); + + Logger log = Logger.get(SpannerQueryRunner.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } + + @NotNull + public static DistributedQueryRunner getSpannerQueryRunner() + throws Exception + { + return createSpannerQueryRunner( + new TestingSpannerInstance(), + ImmutableMap.of("http-server.http.port", "8080"), + ImmutableMap.of(), + TpchTable.getTables(), false); + } + + private static void copyTpchTables( + QueryRunner queryRunner, + String sourceCatalog, + String sourceSchema, + Session session, + Iterable> tables) + { + LOG.debug("Loading data from %s.%s...", sourceCatalog, sourceSchema); + for (TpchTable table : tables) { + copyTable(queryRunner, sourceCatalog, session, sourceSchema, table); + } + } + + private static void copyTable( + QueryRunner queryRunner, + String catalog, + Session session, + String schema, + TpchTable table) + { + QualifiedObjectName source = new QualifiedObjectName(catalog, schema, table.getTableName()); + String target = table.getTableName(); + String primaryKey = table.getColumns().get(0).getSimplifiedColumnName(); + String tableProperties = String.format("WITH (PRIMARY_KEYS = ARRAY['%s'])", primaryKey); + @Language("SQL") + String sql = format("CREATE TABLE IF NOT EXISTS %s %s AS SELECT * FROM %s", + target, tableProperties, source, primaryKey, primaryKey, source); + System.out.println(sql); + LOG.debug("Running import for %s %s", target, sql); + long rows = queryRunner.execute(session, sql).getUpdateCount().getAsLong(); + + LOG.debug("%s rows loaded into %s", rows, target); + } +} diff --git a/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/SpannerTestTable.java b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/SpannerTestTable.java new file mode 100644 index 000000000000..6bd22c28b8d6 --- /dev/null +++ b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/SpannerTestTable.java @@ -0,0 +1,44 @@ +package io.trino.plugin.spanner; + +import io.trino.testing.sql.SqlExecutor; +import io.trino.testing.sql.TestTable; + +import java.util.List; + +import static java.lang.String.format; + +public class SpannerTestTable extends TestTable +{ + + public SpannerTestTable(SqlExecutor sqlExecutor, String namePrefix, String tableDefinition) + { + super(sqlExecutor, namePrefix, tableDefinition); + } + + + @Override + public void createAndInsert(List rowsToInsert) + { + String[] fields = tableDefinition.split(","); + String pkField = fields[fields.length - 1].trim(); + String pkColumn = pkField.split(" ")[0]; + String create = "CREATE TABLE %s (%s) PRIMARY KEY (%s)" + .formatted( + this.name, + tableDefinition, + pkColumn); + sqlExecutor.execute(create); + + try { + for (String row : rowsToInsert) { + String sql = format("INSERT INTO %s VALUES (%s)", name, row); + sqlExecutor.execute(sql); + } + } + catch (Exception e) { + try (SpannerTestTable ignored = this) { + throw e; + } + } + } +} diff --git a/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestSpannerDataTypesMapping.java b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestSpannerDataTypesMapping.java new file mode 100644 index 000000000000..b9e69c0932d8 --- /dev/null +++ b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestSpannerDataTypesMapping.java @@ -0,0 +1,79 @@ +package io.trino.plugin.spanner; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.IntegerType; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.datatype.ColumnSetup; +import io.trino.testing.datatype.CreateAndInsertDataSetup; +import io.trino.testing.datatype.CreateAsSelectDataSetup; +import io.trino.testing.datatype.DataSetup; +import io.trino.testing.datatype.DataType; +import io.trino.testing.datatype.SqlDataTypeTest; +import io.trino.testing.sql.JdbcSqlExecutor; +import io.trino.testing.sql.TrinoSqlExecutor; +import io.trino.tpch.TpchTable; +import org.jetbrains.annotations.NotNull; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static io.trino.spi.type.BooleanType.BOOLEAN; + +public class TestSpannerDataTypesMapping + extends AbstractTestQueryFramework +{ + protected TestingSpannerInstance spannerInstance; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + spannerInstance = closeAfterClass(new TestingSpannerInstance()); + return SpannerQueryRunner.createSpannerQueryRunner( + spannerInstance, + ImmutableMap.of("http-server.http.port", "8080"), + ImmutableMap.of(), + TpchTable.getTables(), false); + } + + @Test + public void testBoolean() + { + SqlDataTypeTest.create() + .addRoundTrip("bool", "true", BOOLEAN) + .addRoundTrip("bool", "false", BOOLEAN) + .addRoundTrip("bool", "NULL", BOOLEAN, "CAST(NULL AS BOOLEAN)") + .addRoundTrip("int64","1", BigintType.BIGINT,"CAST(1 AS BIGINT)") + .execute(getQueryRunner(), spannerCreateAndInsert("test_boolean")); + } + + private DataSetup spannerCreateAndInsert(String tableNamePrefix) + { + return inputs -> new SpannerTestTable(new JdbcSqlExecutor(spannerInstance.getJdbcUrl(), new Properties()) + ,tableNamePrefix, String.format("%s", getColumns(inputs))); + } + @NotNull + private String getColumns(List inputs) + { + return IntStream.range(0, inputs.size()) + .mapToObj(f -> String.format("col_%s %s", f, + inputs.get(f).getDeclaredType().get())) + .collect(Collectors.joining(", ")); + } + + private DataSetup trinoCreateAsSelect(String tableNamePrefix) + { + return trinoCreateAsSelect(getSession(), tableNamePrefix); + } + + private DataSetup trinoCreateAsSelect(Session session, String tableNamePrefix) + { + return new CreateAsSelectDataSetup(new TrinoSqlExecutor(getQueryRunner(), session), tableNamePrefix); + } +} diff --git a/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestSpannerPlugin.java b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestSpannerPlugin.java new file mode 100644 index 000000000000..56a0c0ea299c --- /dev/null +++ b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestSpannerPlugin.java @@ -0,0 +1,33 @@ +package io.trino.plugin.spanner; + +import com.google.common.collect.ImmutableMap; +import io.trino.spi.Plugin; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.testing.TestingConnectorContext; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutionException; + +import static com.google.common.collect.Iterables.getOnlyElement; + +public class TestSpannerPlugin +{ + @Test + public void testCreateConnector() + throws Exception + { + Plugin plugin = new SpannerPlugin(); + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + TestingSpannerInstance instance = new TestingSpannerInstance(); + factory.create("test", ImmutableMap.of( + "spanner.credentials.file", "credentials.json", + "spanner.instanceId", instance.getInstanceId() + , "spanner.projectId", instance.getProjectId() + , "spanner.database", instance.getDatabaseId() + , "spanner.emulated", "true" + , "spanner.emulated.host", instance.getHost() + ), + new TestingConnectorContext()).shutdown(); + instance.close(); + } +} diff --git a/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestingSpannerInstance.java b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestingSpannerInstance.java new file mode 100644 index 000000000000..acd39deb708a --- /dev/null +++ b/plugin/trino-spanner/src/test/java/io/trino/plugin/spanner/TestingSpannerInstance.java @@ -0,0 +1,146 @@ +package io.trino.plugin.spanner; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.InstanceAdminClient; +import com.google.cloud.spanner.InstanceConfigId; +import com.google.cloud.spanner.InstanceId; +import com.google.cloud.spanner.InstanceInfo; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import org.intellij.lang.annotations.Language; +import org.testcontainers.containers.SpannerEmulatorContainer; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class TestingSpannerInstance + implements AutoCloseable +{ + private final String SPANNER_IMAGE = "gcr.io/cloud-spanner-emulator/emulator:latest"; + + private final SpannerEmulatorContainer emulatorContainer; + private final String PROJECT = "test-project"; + private final String INSTANCE = "test-instance"; + private final String DATABASE = "trinodb"; + private Database database; + private SpannerOptions options; + private Spanner spanner; + private DatabaseId databaseId; + private InstanceId instanceId; + private String host = null; + + public TestingSpannerInstance() + throws ExecutionException, InterruptedException + { + this.emulatorContainer = new SpannerEmulatorContainer(DockerImageName.parse(SPANNER_IMAGE)) + .withExposedPorts(9010, 9020); + emulatorContainer.start(); + //this.host = "0.0.0.0:" + emulatorContainer.getEmulatorGrpcEndpoint().split(":")[1]; + this.host=emulatorContainer.getEmulatorGrpcEndpoint(); + options = SpannerOptions + .newBuilder() + .setEmulatorHost(emulatorContainer.getEmulatorGrpcEndpoint()) + .setCredentials(NoCredentials.getInstance()) + .setProjectId(PROJECT) + .build(); + this.spanner = options.getService(); + this.instanceId = createInstance(); + this.database = createDatabase(); + + this.databaseId = DatabaseId.of(instanceId, DATABASE); + } + + private static void execute(String url, String sql) + { + try (Connection connection = DriverManager.getConnection(url, new Properties()); + Statement statement = connection.createStatement()) { + statement.execute(sql); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void main(String[] args) + throws ExecutionException, InterruptedException + { + TestingSpannerInstance spannerInstance = new TestingSpannerInstance(); + Thread.sleep(Duration.ofHours(1).toMillis()); + } + + public String getDatabaseId() + { + return DATABASE; + } + + public String getInstanceId() + { + return INSTANCE; + } + + public String getProjectId() + { + return PROJECT; + } + + public String getHost() + { + return host; + } + + private Database createDatabase() + throws InterruptedException, ExecutionException + { + DatabaseAdminClient dbAdminClient = options.getService().getDatabaseAdminClient(); + return dbAdminClient + .createDatabase( + INSTANCE, + DATABASE, new ArrayList<>()) + .get(); + } + + private InstanceId createInstance() + throws InterruptedException, ExecutionException + { + InstanceConfigId instanceConfig = InstanceConfigId.of(PROJECT, "emulator-config"); + InstanceId instanceId = InstanceId.of(PROJECT, INSTANCE); + InstanceAdminClient insAdminClient = spanner.getInstanceAdminClient(); + return insAdminClient + .createInstance( + InstanceInfo + .newBuilder(instanceId) + .setNodeCount(1) + .setDisplayName("Test instance") + .setInstanceConfigId(instanceConfig) + .build()) + .get().getId(); + } + + public void execute(@Language("SQL") String sql) + { + execute(getJdbcUrl(), sql); + } + + public String getJdbcUrl() + { + return String.format("jdbc:cloudspanner://%s/projects/%s/instances/%s/databases/%s;autoConfigEmulator=true", + emulatorContainer.getEmulatorGrpcEndpoint(), PROJECT, INSTANCE, DATABASE); + } + + @Override + public void close() + throws Exception + { + emulatorContainer.stop(); + } +} diff --git a/pom.xml b/pom.xml index ce7c5f994e61..b2a321630d0b 100644 --- a/pom.xml +++ b/pom.xml @@ -196,6 +196,7 @@ testing/trino-testing-resources testing/trino-testing-services testing/trino-tests + plugin/trino-spanner